Skip to content

Commit

Permalink
Merge #32
Browse files Browse the repository at this point in the history
32: Wait for identify protocol to finish when dialing r=D4nte a=D4nte

Resolves #18

Co-authored-by: Franck Royer <franck@royer.one>
  • Loading branch information
bors[bot] and D4nte authored Apr 13, 2021
2 parents 41fa29f + 4b31a6a commit 707a019
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 55 deletions.
13 changes: 0 additions & 13 deletions src/chat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import PeerId from 'peer-id';
import Waku from '../lib/waku';
import { WakuMessage } from '../lib/waku_message';
import { RelayDefaultTopic } from '../lib/waku_relay';
import { delay } from '../test_utils/';

import { ChatMessage } from './chat_message';

Expand Down Expand Up @@ -49,23 +48,11 @@ const ChatContentTopic = 'dingpu';
await waku.dial(opts.staticNode);
}

await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);

// TODO: identify if it is possible to listen to an event to confirm dial
// finished instead of an arbitrary delay. Tracked with
// https://github.com/status-im/js-waku/issues/18
await delay(2000);
// TODO: Automatically subscribe, tracked with
// https://github.com/status-im/js-waku/issues/17
await waku.relay.subscribe();
console.log('Subscribed to waku relay');

await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);

const staticNodeId = opts.staticNode?.getPeerId();
if (staticNodeId) {
const storePeerId = PeerId.createFromB58String(staticNodeId);
Expand Down
File renamed without changes.
67 changes: 66 additions & 1 deletion src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import { bytes } from 'libp2p-noise/dist/src/@types/basic';
import { Noise } from 'libp2p-noise/dist/src/noise';
import TCP from 'libp2p-tcp';
import Multiaddr from 'multiaddr';
import pTimeout from 'p-timeout';
import PeerId from 'peer-id';

import { delay } from './delay';
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
import { StoreCodec, WakuStore } from './waku_store';

const WaitForIdentityFreqMs = 50;
const WaitForIdentityTimeoutMs = 2_000;

export interface CreateOptions {
listenAddresses: string[];
staticNoiseKey: bytes | undefined;
Expand Down Expand Up @@ -63,12 +68,57 @@ export default class Waku {
* @param peer The peer to dial
*/
async dial(peer: PeerId | Multiaddr | string) {
return this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
const peerId = toPeerId(peer);
await this.waitForIdentify(
peerId,
WaitForIdentityFreqMs,
WaitForIdentityTimeoutMs
);
}

async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
await this.libp2p.dialProtocol(peerId, RelayCodec);
await this.waitForIdentify(
peerId,
WaitForIdentityFreqMs,
WaitForIdentityTimeoutMs
);
}

/**
* Wait for the identify protocol to be finished. This helps ensure
* we know what protocols the peer implements
* @param peerId
* @param frequencyMilliseconds
* @param maxTimeoutMilliseconds
* @throws If there is no known connection with this peer.
*/
async waitForIdentify(
peerId: PeerId,
frequencyMilliseconds: number,
maxTimeoutMilliseconds: number
): Promise<void> {
const checkProtocols = this._waitForIdentify.bind(
this,
peerId,
frequencyMilliseconds
)();

await pTimeout(checkProtocols, maxTimeoutMilliseconds);
}

async _waitForIdentify(peerId: PeerId, frequencyMilliseconds: number) {
while (true) {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'No connection to peer';
if (peer.protocols.length > 0) {
return;
} else {
await delay(frequencyMilliseconds);
}
}
}

async stop() {
Expand All @@ -91,3 +141,18 @@ export default class Waku {
return multiAddrWithId;
}
}

function toPeerId(peer: PeerId | Multiaddr | string): PeerId {
if (typeof peer === 'string') {
peer = new Multiaddr(peer);
}

if (Multiaddr.isMultiaddr(peer)) {
try {
peer = PeerId.createFromB58String(peer.getPeerId());
} catch (err) {
throw `${peer} is not a valid peer type`;
}
}
return peer;
}
32 changes: 3 additions & 29 deletions src/lib/waku_relay/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { expect } from 'chai';
import Pubsub from 'libp2p-interfaces/src/pubsub';

import { NOISE_KEY_1, NOISE_KEY_2 } from '../../test_utils/constants';
import { delay } from '../../test_utils/delay';
import { makeLogFileName } from '../../test_utils/log_file';
import { NimWaku } from '../../test_utils/nim_waku';
import { delay } from '../delay';
import Waku from '../waku';
import { WakuMessage } from '../waku_message';

Expand All @@ -27,15 +27,6 @@ describe('Waku Relay', () => {

await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);

await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);

await waku1.relay.subscribe();
await waku2.relay.subscribe();

Expand Down Expand Up @@ -176,16 +167,7 @@ describe('Waku Relay', () => {

await waku.dial(await nimWaku.getMultiaddrWithId());

await delay(100);
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);

await waku.relay.subscribe();

await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
});

afterEach(async function () {
Expand Down Expand Up @@ -222,6 +204,8 @@ describe('Waku Relay', () => {
});

it('Nim publishes to js', async function () {
await delay(200);

const message = WakuMessage.fromUtf8String('Here is another message.');

const receivedPromise = waitForNextData(waku.libp2p.pubsub);
Expand Down Expand Up @@ -259,16 +243,6 @@ describe('Waku Relay', () => {
waku2.dial(nimWakuMultiaddr),
]);

await delay(100);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);

await Promise.all([waku1.relay.subscribe(), waku2.relay.subscribe()]);

await Promise.all([
Expand Down
10 changes: 0 additions & 10 deletions src/lib/waku_store/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { expect } from 'chai';

import {
delay,
makeLogFileName,
NimWaku,
NOISE_KEY_1,
Expand All @@ -23,11 +22,6 @@ describe('Waku Store', () => {
const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 });
await waku0.dial(await nimWaku.getMultiaddrWithId());

await delay(100);
await new Promise((resolve) =>
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);

await waku0.relay.subscribe();

await new Promise((resolve) =>
Expand All @@ -50,8 +44,6 @@ describe('Waku Store', () => {
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId());

await delay(500);

const nimPeerId = await nimWaku.getPeerId();

const messages = await waku.store.queryHistory(nimPeerId);
Expand All @@ -73,8 +65,6 @@ describe('Waku Store', () => {
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId());

await delay(500);

const nimPeerId = await nimWaku.getPeerId();

const messages = await waku.store.queryHistory(nimPeerId);
Expand Down
2 changes: 1 addition & 1 deletion src/test_utils/async_fs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs, { promises as asyncFs } from 'fs';
import { promisify } from 'util';

import { delay } from './delay';
import { delay } from '../lib/delay';
export const existsAsync = (filepath: string) =>
asyncFs.access(filepath, fs.constants.F_OK);

Expand Down
1 change: 0 additions & 1 deletion src/test_utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from './async_fs';
export * from './constants';
export * from './delay';
export * from './log_file';
export * from './nim_waku';

0 comments on commit 707a019

Please sign in to comment.