Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lightpush): introduce ReliabilityMonitor and allow send retries #2130

Merged
merged 5 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";
export { wakuStore } from "./protocols/store/index.js";

export * as waku from "@waku/core";
export * as utils from "@waku/utils";
Expand Down
7 changes: 3 additions & 4 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import {
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";

import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";

import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
ReceiverReliabilityMonitor,
ReliabilityMonitorManager
} from "./reliability_monitor.js";

const log = new Logger("sdk:filter:subscription_manager");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import {
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol.js";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
public readonly protocol: LightPushCore;

private readonly reliabilityMonitor: SenderReliabilityMonitor;

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
Expand All @@ -33,6 +37,10 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
);

this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor(
this.renewPeer.bind(this)
);

this.protocol = this.core as LightPushCore;
}

Expand Down Expand Up @@ -89,16 +97,23 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
successes.push(success);
}
if (failure) {
failures.push(failure);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving for some context, will make an issue for it later so that we can discuss

in status-go where retry is already implemented they do things differently and we will probably do something similar too

when message is sent it is queued, then it will be retried and once it is definitely fails - it will be communicated to consumer through something similar to event API

cc @waku-org/js-waku

if (failure.peerId) {
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we should retry to any peer available - otherwise we loose message if peer got dropped (renewed, went offline or just networking issue)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, this block is executed WHEN a particular peer fails to send a lightpush request.

example: we have 3 connected peers, and one fails to send it, this is the block that will reattempt delivery for that peer and renewing (instead of just renewiing)

we aren't losing the message in any case

reliability monitor later (even after if reattempts fail), resends the lightpush request after renewal

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I think we should align here on what is desired behavior.

considering what we care is successfully pushing a message at this stage (i.e no errors while sending)
then it is enough for us to have at least 1 successful push - in that case no retries needed
if all failures - then just retry but not necessarily to the same peer, just any peer
and if during all of this any peer is failing 3 times - we renew

I probably summarized it for myself only, but just clarifying

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

considering what we care is successfully pushing a message at this stage (i.e no errors while sending)
then it is enough for us to have at least 1 successful push - in that case no retries needed

Well, technically, yes. I agree. I can't really think of a case where we would indeed need redundancy if one of the peers can assure us that they indeed relayed the message further into GossipSub. However, for now, without LightPush V2, it's not trivial to get that. Thus, having redundancy + retries is good for now and we can revisit later if our apps perform well. (ref: https://discord.com/channels/1110799176264056863/1284121724433993758/1285014955023798342 as well)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decoupled into follow up #2140

log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we risking getting into recursion here? apologies if I missed that code part

lightPush fails -> retry initiated -> lightPush fails -> ...

Copy link
Collaborator Author

@danisharora099 danisharora099 Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm that's a good point.
technically, recursion would've been a neater solution here but this is not it

here: we detect (one peer) to have failed to send the lightpush request, and reattempt a few times. if it keeps failing, we renew the peer, and attempt (only once) to send the lightpush request through that peer.
here it would be neater to introduce recursion maybe, but seems like overkill for now TBH.

we can find peace in the fact that:

  • we already use multiple peers to send it first time
  • even if one of the peers fails, we will re attempt
    • if even that fails, we will do renewal and use the new peer to send it

for it to fail, ALL peers would have to literally just not entertain our requests

in a case where we disregard that, introducing recursion here would be a neat solution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

);
}
}

failures.push(failure);
}
} else {
log.error("Failed unexpectedly while sending:", result.reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol.js";
import { BaseProtocolSDK } from "../base_protocol.js";

const DEFAULT_NUM_PEERS = 1;

Expand Down
70 changes: 70 additions & 0 deletions packages/sdk/src/reliability_monitor/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import type { Peer, PeerId } from "@libp2p/interface";
import {
ContentTopic,
CoreProtocolResult,
PubsubTopic
} from "@waku/interfaces";

import { ReceiverReliabilityMonitor } from "./receiver.js";
import { SenderReliabilityMonitor } from "./sender.js";

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
private static senderMonitor: SenderReliabilityMonitor | undefined;

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}

const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}

public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer>
): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
renewPeer
);
}
return ReliabilityMonitorManager.senderMonitor;
}

private constructor() {}

public static stop(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}

public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,6 @@ const log = new Logger("sdk:receiver:reliability_monitor");

const DEFAULT_MAX_PINGS = 3;

export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();

public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}

const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}

private constructor() {}

public static destroy(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
}

public static destroyAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
}
}

export class ReceiverReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();
Expand Down
57 changes: 57 additions & 0 deletions packages/sdk/src/reliability_monitor/sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces";
import { Logger } from "@waku/utils";

const log = new Logger("sdk:sender:reliability_monitor");

const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3;

export class SenderReliabilityMonitor {
private attempts: Map<PeerIdStr, number> = new Map();
private readonly maxAttemptsBeforeRenewal =
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;

public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}

public async attemptRetriesOrRenew(
peerId: PeerId,
protocolSend: () => Promise<CoreProtocolResult>
): Promise<void> {
const peerIdStr = peerId.toString();
const currentAttempts = this.attempts.get(peerIdStr) || 0;
this.attempts.set(peerIdStr, currentAttempts + 1);

if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) {
try {
const result = await protocolSend();
if (result.success) {
log.info(`Successfully sent message after retry to ${peerIdStr}`);
this.attempts.delete(peerIdStr);
} else {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${result.failure}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} catch (error) {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${error}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} else {
try {
const newPeer = await this.renewPeer(peerId);
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);

this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0);
await protocolSend();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prev - https://github.com/waku-org/js-waku/pull/2130/files#r1758702483

here we call .send in both branches making no exit of the recursion (i.e it will be called over and over it seems to me)

so I think here we should do .send to newPeer instead because next time from here attemptRetriesOrRenew will be called for peerIdStr and at that point this.attempts. will not have it so it will just continue

I believe this is the reason why you needed to implement .stop operation on the manager entity

Copy link
Collaborator Author

@danisharora099 danisharora099 Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.protocol.send() is different from SDK's lightpush.send()

SDK.send() calls this.protocol.send() internally

o I think here we should do .send to newPeer instead because next time from here attemptRetriesOrRenew

We are indeed doing this.protocol.send(peer) by binding the protocolSend() function call:

void this.reliabilityMonitor.attemptRetriesOrRenew(
                failure.peerId,
                () => this.protocol.send(encoder, message, peer)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right, thanks for clarifying
then it is not an issue - we can have a recursion here

} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
}
}
}
}
8 changes: 4 additions & 4 deletions packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { wakuFilter } from "./protocols/filter/index.js";
import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { wakuLightPush } from "./protocols/lightpush/index.js";
import { wakuStore } from "./protocols/store/index.js";
import { ReliabilityMonitorManager } from "./reliability_monitor/index.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -196,7 +196,7 @@ export class WakuNode implements Waku {
}

public async stop(): Promise<void> {
ReliabilityMonitorManager.destroyAll();
ReliabilityMonitorManager.stopAll();
this.connectionManager.stop();
await this.libp2p.stop();
}
Expand Down
14 changes: 9 additions & 5 deletions packages/tests/tests/light-push/peer_management.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LightNode } from "@waku/interfaces";
import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { describe } from "mocha";

Expand Down Expand Up @@ -78,18 +79,21 @@ describe("Waku Light Push: Peer Management: E2E", function () {
expect(response2.failures).to.have.length(1);
expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect);

// send another lightpush request -- renewal should have triggerred and new peer should be used instead of the disconnected one
// send another lightpush request
// reattempts to send should be triggerred
// then renewal should happen
// so one failure should exist
const response3 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});

// wait for reattempts to finish as they are async and not awaited
await delay(500);
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved

expect(response3.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
waku.lightPush.numPeersToUse - 1
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
);

expect(response3.successes).to.not.include(peerToDisconnect);
if (response3.failures) {
expect(response3.failures.length).to.equal(0);
}
});
});
Loading