Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat_: dial, drop and retrieve connected peers #6013

Merged
merged 13 commits into from
Oct 30, 2024
138 changes: 99 additions & 39 deletions wakuv2/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ package wakuv2
resp) );
}

static void cGoWakuDialPeer(void* wakuCtx,
char* peerMultiAddr,
char* protocol,
int timeoutMs,
void* resp) {

WAKU_CALL( waku_dial_peer(wakuCtx,
peerMultiAddr,
protocol,
timeoutMs,
(WakuCallBack) callback,
resp) );
}

static void cGoWakuDialPeerById(void* wakuCtx,
char* peerId,
char* protocol,
Expand Down Expand Up @@ -220,10 +234,14 @@ package wakuv2
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}

static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) {
static void cGoWakuGetNumConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}

static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_connected_peers(wakuCtx, (WakuCallBack) callback, resp) );
}

static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) );
}
Expand Down Expand Up @@ -328,7 +346,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"

"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
Expand Down Expand Up @@ -1905,10 +1922,13 @@ func (w *Waku) ClearEnvelopesCache() {
w.envelopeCache = newTTLCache()
}

// TODO-nwaku
func (w *Waku) PeerCount() int {
return 0
// return w.node.PeerCount()
func (w *Waku) PeerCount() (int, error) {
peerCount, err := w.GetNumConnectedPeers()
if err != nil {
return 0, err
}

return peerCount, nil
}

// TODO-nwaku
Expand Down Expand Up @@ -2164,22 +2184,24 @@ func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
}

func (w *Waku) DialPeer(address multiaddr.Multiaddr) error {
// TODO-nwaku
/*
ctx, cancel := context.WithTimeout(w.ctx, requestTimeout)
defer cancel()
return w.node.DialPeerWithMultiAddress(ctx, address) */
return nil
// Using WakuConnect so it matches the go-waku's behavior and terminology
return w.WakuConnect(address.String(), int(requestTimeout/time.Millisecond))
}

func (w *Waku) DialPeerByID(peerID peer.ID) error {
return w.WakuDialPeerById(peerID, string(relay.WakuRelayID_v200), 1000)
}
func (self *Waku) DropPeer(peerId peer.ID) error {
var resp = C.allocResp()
var cPeerId = C.CString(peerId.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))

func (w *Waku) DropPeer(peerID peer.ID) error {
// TODO-nwaku
// return w.node.ClosePeerById(peerID)
return nil
C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp)

if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DisconnectPeerById: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}

func (w *Waku) ProcessingP2PMessages() bool {
Expand Down Expand Up @@ -2668,6 +2690,24 @@ func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error {
return errors.New(errMsg)
}

func (self *Waku) WakuDialPeer(peerMultiAddr multiaddr.Multiaddr, protocol string, timeoutMs int) error {
var resp = C.allocResp()
var cPeerMultiAddr = C.CString(peerMultiAddr.String())
var cProtocol = C.CString(protocol)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerMultiAddr))
defer C.free(unsafe.Pointer(cProtocol))

C.cGoWakuDialPeer(self.wakuCtx, cPeerMultiAddr, cProtocol, C.int(timeoutMs), resp)

if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DialPeer: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}

func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs int) error {
var resp = C.allocResp()
var cPeerId = C.CString(peerId.String())
Expand Down Expand Up @@ -2753,7 +2793,7 @@ func (self *Waku) ListPeersInMesh(pubsubTopic string) (int, error) {
return 0, errors.New(errMsg)
}

func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) {
func (self *Waku) GetNumConnectedRelayPeers(paramPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(paramPubsubTopic) == 0 {
pubsubTopic = ""
Expand All @@ -2766,22 +2806,58 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPubsubTopic))

C.cGoWakuGetNumConnectedPeers(self.wakuCtx, cPubsubTopic, resp)
C.cGoWakuGetNumConnectedRelayPeers(self.wakuCtx, cPubsubTopic, resp)

if C.getRet(resp) == C.RET_OK {
numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numPeers, err := strconv.Atoi(numPeersStr)
if err != nil {
errMsg := "GetNumConnectedPeers - error converting string to int: " + err.Error()
errMsg := "GetNumConnectedRelayPeers - error converting string to int: " + err.Error()
return 0, errors.New(errMsg)
}
return numPeers, nil
}
errMsg := "error GetNumConnectedPeers: " +
errMsg := "error GetNumConnectedRelayPeers: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, errors.New(errMsg)
}

func (self *Waku) GetConnectedPeers() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetConnectedPeers(self.wakuCtx, resp)

if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
return peer.IDSlice{}, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")

var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
if err != nil {
return nil, fmt.Errorf("GetConnectedPeers - decoding peerId: %w", err)
}
peers = append(peers, id)
}

return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetConnectedPeers: %s", errMsg)
}

func (self *Waku) GetNumConnectedPeers() (int, error) {
connecterPeers, err := self.GetConnectedPeers()
if err != nil {
return 0, err
}
return len(connecterPeers), nil
}

func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
Expand Down Expand Up @@ -2842,22 +2918,6 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) {
return nil, errors.New(errMsg)
}

func (self *Waku) DisconnectPeerById(peerId peer.ID) error {
var resp = C.allocResp()
var cPeerId = C.CString(peerId.String())
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))

C.cGoWakuDisconnectPeerById(self.wakuCtx, cPeerId, resp)

if C.getRet(resp) == C.RET_OK {
return nil
}
errMsg := "error DisconnectPeerById: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return errors.New(errMsg)
}

// func main() {

// config := WakuConfig{
Expand Down
94 changes: 91 additions & 3 deletions wakuv2/nwaku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap"

Expand Down Expand Up @@ -196,7 +197,7 @@ func TestBasicWakuV2(t *testing.T) {

// Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error {
numConnected, err := w.GetNumConnectedPeers()
numConnected, err := w.GetNumConnectedRelayPeers()
if err != nil {
return err
}
Expand All @@ -219,7 +220,7 @@ func TestBasicWakuV2(t *testing.T) {
require.True(t, slices.Contains(connectedStoreNodes, storeNode.ID), "nwaku should be connected to the store node")

// Disconnect from the store node
err = w.DisconnectPeerById(storeNode.ID)
err = w.DropPeer(storeNode.ID)
require.NoError(t, err)

// Check that we are indeed disconnected
Expand All @@ -228,10 +229,15 @@ func TestBasicWakuV2(t *testing.T) {
isDisconnected := !slices.Contains(connectedStoreNodes, storeNode.ID)
require.True(t, isDisconnected, "nwaku should be disconnected from the store node")

storeNodeMultiadd, err := multiaddr.NewMultiaddr(storeNodeInfo.ListenAddresses[0])
require.NoError(t, err)

// Re-connect
err = w.DialPeerByID(storeNode.ID)
err = w.DialPeer(storeNodeMultiadd)
require.NoError(t, err)

time.Sleep(1 * time.Second)

// Check that we are connected again
connectedStoreNodes, err = w.GetPeerIdsByProtocol(string(store.StoreQueryID_v300))
require.NoError(t, err)
Expand Down Expand Up @@ -545,6 +551,88 @@ func TestPeerExchange(t *testing.T) {
require.NoError(t, discV5Node.Stop()) */
}

func TestDial(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)

dialerNodeConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}

// start node that will initiate the dial
dialerNodeWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9020,
TcpPort: 60020,
}

dialerNode, err := New(nil, "", &dialerNodeConfig, &dialerNodeWakuConfig, logger.Named("dialerNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, dialerNode.Start())

time.Sleep(1 * time.Second)
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved

receiverNodeConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}

// start node that will receive the dial
receiverNodeWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9021,
TcpPort: 60021,
}

receiverNode, err := New(nil, "", &receiverNodeConfig, &receiverNodeWakuConfig, logger.Named("receiverNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, receiverNode.Start())

time.Sleep(1 * time.Second)

receiverMultiaddr, err := receiverNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, receiverMultiaddr)

// Check that both nodes start with no connected peers
dialerPeerCount, err := dialerNode.PeerCount()
require.NoError(t, err)
require.True(t, dialerPeerCount == 0, "Dialer node should have no connected peers")

receiverPeerCount, err := receiverNode.PeerCount()
require.NoError(t, err)
require.True(t, receiverPeerCount == 0, "Receiver node should have no connected peers")

// Dial
err = dialerNode.DialPeer(receiverMultiaddr[0])
require.NoError(t, err)

time.Sleep(1 * time.Second)

// Check that both nodes now have one connected peer
dialerPeerCount, err = dialerNode.PeerCount()
require.NoError(t, err)
require.True(t, dialerPeerCount == 1, "Dialer node should have 1 peer")

receiverPeerCount, err = receiverNode.PeerCount()
require.NoError(t, err)
require.True(t, receiverPeerCount == 1, "Receiver node should have 1 peer")

// Stop nodes
require.NoError(t, dialerNode.Stop())
require.NoError(t, receiverNode.Stop())

}

/*

func TestWakuV2Filter(t *testing.T) {
Expand Down
Loading