From 4e5e02f029a929f2aae1f9c71dc81e0d6f2af863 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 9 Oct 2024 11:45:14 +0530 Subject: [PATCH] feat: Filter reacts to peer:disconnect event, add tests --- .../protocols/filter/subscription_manager.ts | 3 +- packages/sdk/src/reliability_monitor/index.ts | 7 ++- .../sdk/src/reliability_monitor/receiver.ts | 11 ++++- .../tests/filter/peer_management.spec.ts | 48 +++++++++++++++++++ 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index d8578fe1cf..71dd9959f0 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -53,7 +53,8 @@ export class SubscriptionManager implements ISubscription { this.getPeers.bind(this), this.renewPeer.bind(this), () => Array.from(this.subscriptionCallbacks.keys()), - this.protocol.subscribe.bind(this.protocol) + this.protocol.subscribe.bind(this.protocol), + this.protocol.addLibp2pEventListener.bind(this.protocol) ); } diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index b0d8dc1c3a..f5bc48879d 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -2,6 +2,7 @@ import type { Peer, PeerId } from "@libp2p/interface"; import { ContentTopic, CoreProtocolResult, + Libp2p, PubsubTopic } from "@waku/interfaces"; @@ -24,7 +25,8 @@ export class ReliabilityMonitorManager { pubsubTopic: PubsubTopic, peer: Peer, contentTopics: ContentTopic[] - ) => Promise + ) => Promise, + addLibp2pEventListener: Libp2p["addEventListener"] ): ReceiverReliabilityMonitor { if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; @@ -35,7 +37,8 @@ export class ReliabilityMonitorManager { getPeers, renewPeer, getContentTopics, - protocolSubscribe + protocolSubscribe, + addLibp2pEventListener ); ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); return monitor; diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 985c52a59d..400485a0e9 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -3,6 +3,7 @@ import { ContentTopic, CoreProtocolResult, IProtoMessage, + Libp2p, PeerIdStr, PubsubTopic } from "@waku/interfaces"; @@ -38,7 +39,8 @@ export class ReceiverReliabilityMonitor { pubsubTopic: PubsubTopic, peer: Peer, contentTopics: ContentTopic[] - ) => Promise + ) => Promise, + private addLibp2pEventListener: Libp2p["addEventListener"] ) { const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); @@ -49,6 +51,13 @@ export class ReceiverReliabilityMonitor { } }; allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); + + this.addLibp2pEventListener("peer:disconnect", (evt) => { + const peerId = evt.detail; + if (this.getPeers().some((p) => p.id.equals(peerId))) { + void this.renewAndSubscribePeer(peerId); + } + }); } public setMaxMissedMessagesThreshold(value: number | undefined): void { diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 8eb7db45d7..7f3525ca38 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -272,4 +272,52 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(waku.filter.connectedPeers.length).to.equal(2); }); + + it("Renews peer for Filter on peer:disconnect event", async function () { + this.timeout(30000); + + const messages: DecodedMessage[] = []; + const { error, subscription } = await waku.filter.subscribe( + [decoder], + (msg) => { + messages.push(msg); + } + ); + + if (error) { + throw error; + } + + const initialPeers = waku.filter.connectedPeers; + expect(initialPeers.length).to.equal(waku.filter.numPeersToUse); + + const peerToDisconnect = initialPeers[0]; + await waku.connectionManager.dropConnection(peerToDisconnect.id); + + await delay(5000); + + expect(waku.filter.connectedPeers.length).to.equal( + waku.filter.numPeersToUse + ); + + const stillConnected = waku.filter.connectedPeers.some((peer) => + peer.id.equals(peerToDisconnect.id) + ); + expect(stillConnected).to.be.false; + + await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello after disconnect") + }); + + await delay(2000); + + expect(messages.length).to.equal(1); + expect(new TextDecoder().decode(messages[0].payload)).to.equal( + "Hello after disconnect" + ); + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); });