From 143bc44ff0f5aba81dc9476276510306d7185179 Mon Sep 17 00:00:00 2001 From: John Turpish Date: Wed, 10 Aug 2022 17:58:13 -0400 Subject: [PATCH 1/5] RTCPeerConnection does not exist during a test? --- package.json | 1 + src/stream.ts | 33 +++++++++++++++++++++++++++------ src/transport.ts | 9 +++++---- test/stream.spec.ts | 12 ++++++++++++ 4 files changed, 45 insertions(+), 10 deletions(-) create mode 100644 test/stream.spec.ts diff --git a/package.json b/package.json index f772f89..a096f18 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "@typescript-eslint/parser": "^5.32.0", "aegir": "^37.4.6", "prettier": "^2.7.1", + "rtc": "^3.4.0", "typescript": "^4.7.4" }, "dependencies": { diff --git a/src/stream.ts b/src/stream.ts index ae44495..d52902c 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,19 +1,35 @@ -import { Stream } from '@libp2p/interface-connection'; -import { StreamStat } from '@libp2p/interface-connection'; +import { Stream, StreamStat, Direction, StreamTimeline } from '@libp2p/interface-connection'; // import { logger } from '@libp2p/logger'; import { Source } from 'it-stream-types'; import { Sink } from 'it-stream-types'; import { pushable, Pushable } from 'it-pushable'; import defer, { DeferredPromise } from 'p-defer'; import merge from 'it-merge'; -import { Uint8ArrayList } from 'uint8arraylist' +import { Uint8ArrayList } from 'uint8arraylist'; // const log = logger('libp2p:webrtc:connection'); +export class WebRTCStreamStat implements StreamStat { + direction: Direction; + timeline: StreamTimeline; + protocol: string; + + constructor(d: Direction, t?: StreamTimeline) { + this.direction = d; + if (t) { + this.timeline = t; + } else { + this.timeline = { open: new Date().getTime() }; + } + this.protocol = 'webrtc'; + } +} + type StreamInitOpts = { channel: RTCDataChannel; metadata?: Record; - stat: StreamStat; + stat?: StreamStat; + direction?: Direction; }; export class WebRTCStream implements Stream { @@ -46,8 +62,13 @@ export class WebRTCStream implements Stream { constructor(opts: StreamInitOpts) { this.channel = opts.channel; this.id = this.channel.label; - - this.stat = opts.stat; + if (opts.stat) { + this.stat = opts.stat; + } else if (opts.direction) { + this.stat = new WebRTCStreamStat(opts.direction); + } else { + throw Error('Caller needs to specify at least direction, if not stat'); + } switch (this.channel.readyState) { case 'open': this.opened.resolve(); diff --git a/src/transport.ts b/src/transport.ts index 302bf72..e26630f 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -45,7 +45,8 @@ export class WebRTCTransport implements Transport, Initializable { } async _connect(ma: Multiaddr, options: WebRTCDialOptions) { - let registrar = (await this.components.promise).getRegistrar(); + let comps = await this.components.promise; + // let registrar = (await this.components.promise).getRegistrar(); let peerConnection = new RTCPeerConnection(); // create data channel let handshakeDataChannel = peerConnection.createDataChannel('data', { negotiated: true, id: 1 }); @@ -85,11 +86,11 @@ export class WebRTCTransport implements Transport, Initializable { setTimeout(dataChannelOpenPromise.reject, 10000); await dataChannelOpenPromise; - let myPeerId = this.components.getPeerId(); + let myPeerId = comps.getPeerId(); let rps = ma.getPeerId(); if (!rps) { throw new Error('TODO Do we really need a peer ID ?'); - } + } let theirPeerId = p.peerIdFromString(rps); // do noise handshake @@ -97,7 +98,7 @@ export class WebRTCTransport implements Transport, Initializable { // is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. let fingerprintsPrologue = [myPeerId.multihash, theirPeerId.multihash].sort().join(''); let noise = new Noise(myPeerId.privateKey, undefined, stablelib, utf8.encode(fingerprintsPrologue)); - let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: {direction: 'outbound', timeline: {open: 0}} }); + let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'outbound', timeline: { open: 0 } } }); await noise.secureOutbound(myPeerId, wrappedChannel, theirPeerId); return new WebRTCConnection({ diff --git a/test/stream.spec.ts b/test/stream.spec.ts new file mode 100644 index 0000000..b041654 --- /dev/null +++ b/test/stream.spec.ts @@ -0,0 +1,12 @@ +import { expect } from 'chai'; +import * as underTest from '../src/stream.js'; +// import { RTCPeerConnection } from 'rtc'; + +describe('stream stats', () => { + it('can construct', () => { + // let pc = new RTCPeerConnection(); + // let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: {} }); + expect(s.stat.timeline.close).to.be.null; + }); +}); From d6ddfbf05c27d5812ac6f8d903149d8f95d4bb71 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Thu, 11 Aug 2022 19:04:59 +0530 Subject: [PATCH 2/5] initial connection implementation --- package.json | 6 +- src/connection.ts | 212 ++++++++++++++++++++++++++++++++++------ src/stream.ts | 2 +- src/transport.ts | 31 +++--- test/connection.spec.ts | 2 + test/util.ts | 6 ++ 6 files changed, 213 insertions(+), 46 deletions(-) create mode 100644 test/connection.spec.ts create mode 100644 test/util.ts diff --git a/package.json b/package.json index f772f89..844f3bb 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "format": "prettier --write src/*.ts" }, "devDependencies": { + "@libp2p/interface-mocks": "^4.0.1", "@types/uuid": "^8.3.4", "@typescript-eslint/parser": "^5.32.0", "aegir": "^37.4.6", @@ -22,15 +23,18 @@ "dependencies": { "@chainsafe/libp2p-noise": "git://github.com/ChainSafe/js-libp2p-noise.git#15f7a6e700a69c9a40abb82d989a55032d5cf687", "@libp2p/components": "^2.0.3", - "@libp2p/interface-connection": "^3.0.0", + "@libp2p/interface-connection": "^3.0.1", + "@libp2p/interface-registrar": "^2.0.3", "@libp2p/interface-transport": "^1.0.3", "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", + "@libp2p/multistream-select": "^3.0.0", "@libp2p/peer-id": "^1.1.15", "abortable-iterator": "^4.0.2", "it-merge": "^1.0.4", "p-defer": "^4.0.0", "socket.io-client": "^4.1.2", + "timeout-abort-controller": "^3.0.0", "uuid": "^8.3.2" } } diff --git a/src/connection.ts b/src/connection.ts index 5fb5782..ff2204f 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -4,70 +4,226 @@ import { AbortOptions } from '@libp2p/interfaces'; import { logger } from '@libp2p/logger'; import { Multiaddr } from '@multiformats/multiaddr'; import { v4 as genUuid } from 'uuid'; +import { Components } from '@libp2p/components'; +import defer from 'p-defer'; +import { TimeoutController } from 'timeout-abort-controller'; +import { WebRTCStream } from './stream'; +import { select as msselect, handle as mshandle } from '@libp2p/multistream-select'; +import { Duplex } from 'it-stream-types'; +import { Uint8ArrayList } from 'uint8arraylist'; const log = logger('libp2p:webrtc:connection'); type ConnectionInit = { + components: Components; id: string; localPeer: PeerId; localAddr?: Multiaddr; remoteAddr: Multiaddr; + remotePeer: PeerId; direction: ic.Direction; tags?: string[]; pc: RTCPeerConnection; - credential_string: string; - remotePeerId: PeerId; }; +const DEFAULT_MAX_INBOUND_STREAMS = 32; +const DEFAULT_MAX_OUTBOUND_STREAMS = 64; +const OPEN_STREAM_TIMEOUT = 30_000; + export class WebRTCConnection implements ic.Connection { id: string; stat: ic.ConnectionStat; + localPeer: PeerId; + localAddr?: Multiaddr; remoteAddr: Multiaddr; remotePeer: PeerId; tags: string[] = []; - streams: ic.Stream[] = []; - direction: ic.Direction; + components: Components; + private _streams: Map = new Map(); private peerConnection: RTCPeerConnection; - private ufrag: string; constructor(init: ConnectionInit) { - this.streams = []; this.remoteAddr = init.remoteAddr; this.id = init.id; - this.direction = init.direction; this.peerConnection = init.pc; - this.ufrag = init.credential_string; + this.remotePeer = init.remotePeer; + this.localPeer = init.localPeer; + this.localAddr = init.localAddr; + this.components = init.components; this.stat = { - direction: 'outbound', - timeline: { open: 0 }, - status: 'CLOSED', + direction: init.direction, + status: 'OPEN', + timeline: { + open: new Date().getTime(), + }, }; - this.remotePeer = init.remotePeerId; - // for muxing incoming stream - // this._peerConnection.ondatachannel = ({ channel }) => { - // let stream = DataChannelStream(channel) - // this.addStream(stream) - // } + this.handleIncomingStreams(); + } + + private handleIncomingStreams() { + let metrics = this.components.getMetrics(); + this.peerConnection.ondatachannel = async ({ channel }) => { + let [openPromise, abortPromise] = [defer(), defer()]; + let controller = new TimeoutController(OPEN_STREAM_TIMEOUT); + controller.signal.onabort = () => abortPromise.resolve(); + channel.onopen = () => openPromise.resolve(); + + await Promise.race([openPromise, abortPromise]); + if (controller.signal.aborted) { + // TODO: Better errors + throw Error(controller.signal.reason); + } + + let rawStream = new WebRTCStream({ + channel, + stat: { + direction: 'inbound', + timeline: { + open: new Date().getTime(), + }, + }, + }); + let registrar = this.components.getRegistrar(); + let protocols = registrar.getProtocols(); + + let { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal }); + if (metrics) { + metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer }); + } + + rawStream.stat.protocol = protocol; + let result = this.wrapMsStream(rawStream, stream); + + this.addStream(result); + + // handle stream + let { handler } = registrar.getHandler(protocol); + handler({ connection: this, stream: result }); + }; + } + + private wrapMsStream(rawStream: WebRTCStream, stream: Duplex>): ic.Stream { + return { + ...stream, + close: () => { + rawStream.close(); + }, + closeRead: () => { + rawStream.closeRead(); + }, + closeWrite: () => { + rawStream.closeWrite(); + }, + abort: (err) => { + rawStream.abort(err); + }, + reset: () => rawStream.reset(), + id: rawStream.id, + metadata: rawStream.metadata, + stat: rawStream.stat, + }; + } + + private findStreamLimit(protocol: string, direction: ic.Direction): number { + let registrar = this.components.getRegistrar(); + try { + let handler = registrar.getHandler(protocol); + return direction === 'inbound' ? handler.options.maxInboundStreams || DEFAULT_MAX_INBOUND_STREAMS : handler.options.maxOutboundStreams || DEFAULT_MAX_OUTBOUND_STREAMS; + } catch (err) {} + return direction === 'inbound' ? DEFAULT_MAX_INBOUND_STREAMS : DEFAULT_MAX_OUTBOUND_STREAMS; } - async newStream(multicodecs: string | string[], options?: AbortOptions): Promise { - // let label = uuid.v4() - // let dc = this._peerConnection.createDataChannel(label, {}) - // await datachannel opening - // return DataChannelStream(dc) - log('TODO', this.ufrag); - this.peerConnection.createDataChannel(genUuid()); - throw new Error('not implemented'); + private countStream(protocol: string, direction: ic.Direction): number { + return this.streams.filter((s) => s.stat.protocol === protocol && s.stat.direction === direction).length; + } + + async newStream(protocols: string | string[], options: AbortOptions = {}): Promise { + let label = genUuid(); + let openPromise = defer(); + let abortedPromise = defer(); + let controller: TimeoutController | undefined; + let metrics = this.components.getMetrics(); + let openError: Error | undefined; + + log.trace(`opening new stream with protocols: ${protocols}`); + + // timeout in case no abort options are provided + if (options.signal == null) { + log.trace(`[stream: ${label}] no abort signal provided, creating timeout controller`); + controller = new TimeoutController(OPEN_STREAM_TIMEOUT); + options.signal = controller.signal; + } + + options.signal.onabort = () => { + log.trace(`[stream: ${label}] abort called - ${options.signal?.reason}`); + openError = new Error(options.signal?.reason || 'aborted'); + abortedPromise.resolve(); + }; + + let channel = this.peerConnection.createDataChannel(label); + channel.onopen = (_evt) => { + openPromise.resolve(); + }; + channel.onerror = (_evt) => { + log.trace(`[stream: ${label}] data channel error: ${(_evt as RTCErrorEvent).error}`); + openError = new Error(`data channel error`); + abortedPromise.resolve(); + }; + + await Promise.race([openPromise, abortedPromise]); + + // check for error + if (openError) { + // TODO: Better errors + throw openError; + } + + let rawStream = new WebRTCStream({ + channel, + stat: { + direction: 'outbound', + timeline: { + open: new Date().getTime(), + }, + }, + }); + + let { stream, protocol } = await msselect(rawStream, protocols, { signal: options.signal }); + log.trace(`[stream ${label}] select protocol - ${protocol}`); + // check if stream is within limit after protocol has been negotiated + rawStream.stat.protocol = protocol; + let result = this.wrapMsStream(rawStream, stream); + // check if stream can be accomodated + if (metrics) { + metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer }); + } + + this.addStream(result); + return result; } addStream(stream: ic.Stream): void { - throw new Error('not implemented'); + let protocol = stream.stat.protocol!; + let direction = stream.stat.direction; + if (this.countStream(protocol, direction) === this.findStreamLimit(protocol, direction)) { + log(`${direction} stream limit reached for protocol - ${protocol}`); + let err = new Error(`${direction} stream limit reached for protocol - ${protocol}`); + stream.abort(err); + throw err; + } + this._streams.set(stream.id, stream); } + removeStream(id: string): void { - throw new Error('not implemented'); + this._streams.delete(id); + } + + get streams(): ic.Stream[] { + return Array.from(this._streams.values()); } + async close(): Promise { - throw new Error('not implemented'); + this.peerConnection.close(); } } diff --git a/src/stream.ts b/src/stream.ts index ae44495..ea9d114 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -6,7 +6,7 @@ import { Sink } from 'it-stream-types'; import { pushable, Pushable } from 'it-pushable'; import defer, { DeferredPromise } from 'p-defer'; import merge from 'it-merge'; -import { Uint8ArrayList } from 'uint8arraylist' +import { Uint8ArrayList } from 'uint8arraylist'; // const log = logger('libp2p:webrtc:connection'); diff --git a/src/transport.ts b/src/transport.ts index 302bf72..9525260 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -10,22 +10,24 @@ import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from import { logger } from '@libp2p/logger'; import { Multiaddr } from '@multiformats/multiaddr'; import { v4 as genUuid } from 'uuid'; -import defer, { DeferredPromise } from 'p-defer'; +import defer, { DeferredPromise } from 'p-defer'; const log = logger('libp2p:webrtc:transport'); const utf8 = new TextEncoder(); export class WebRTCTransport implements Transport, Initializable { - private components: DeferredPromise = defer(); + private componentsPromise: DeferredPromise = defer(); + private components: Components | undefined; init(components: Components): void { - this.components.resolve(components) + this.componentsPromise.resolve(); + this.components = components; } async dial(ma: Multiaddr, options: DialOptions): Promise { - const rawConn = this._connect(ma, options); - log('new outbound connection %s', rawConn, genUuid()); - throw new Error('not implemented'); + const rawConn = await this._connect(ma, options); + log(`dialing address - ${ma}`); + return rawConn; } createListener(options: CreateListenerOptions): Listener { @@ -44,13 +46,10 @@ export class WebRTCTransport implements Transport, Initializable { return true; } - async _connect(ma: Multiaddr, options: WebRTCDialOptions) { - let registrar = (await this.components.promise).getRegistrar(); + async _connect(ma: Multiaddr, options: WebRTCDialOptions): Promise { let peerConnection = new RTCPeerConnection(); // create data channel let handshakeDataChannel = peerConnection.createDataChannel('data', { negotiated: true, id: 1 }); - // let handshakeChannel = peerConnection.createDataChannel("data", { id: 1 }) - // // // create offer sdp let offerSdp = await peerConnection.createOffer(); @@ -83,13 +82,13 @@ export class WebRTCTransport implements Transport, Initializable { let dataChannelOpenPromise = defer(); handshakeDataChannel.onopen = (_) => dataChannelOpenPromise.resolve(); setTimeout(dataChannelOpenPromise.reject, 10000); - await dataChannelOpenPromise; + await dataChannelOpenPromise.promise; - let myPeerId = this.components.getPeerId(); + let myPeerId = this.components!.getPeerId(); let rps = ma.getPeerId(); if (!rps) { throw new Error('TODO Do we really need a peer ID ?'); - } + } let theirPeerId = p.peerIdFromString(rps); // do noise handshake @@ -97,17 +96,17 @@ export class WebRTCTransport implements Transport, Initializable { // is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. let fingerprintsPrologue = [myPeerId.multihash, theirPeerId.multihash].sort().join(''); let noise = new Noise(myPeerId.privateKey, undefined, stablelib, utf8.encode(fingerprintsPrologue)); - let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: {direction: 'outbound', timeline: {open: 0}} }); + let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'outbound', timeline: { open: 0 } } }); await noise.secureOutbound(myPeerId, wrappedChannel, theirPeerId); return new WebRTCConnection({ + components: this.components!, id: ma.toString(), remoteAddr: ma, localPeer: myPeerId, direction: 'outbound', pc: peerConnection, - credential_string: ufrag, - remotePeerId: theirPeerId, + remotePeer: theirPeerId, }); } } diff --git a/test/connection.spec.ts b/test/connection.spec.ts new file mode 100644 index 0000000..a88f9a3 --- /dev/null +++ b/test/connection.spec.ts @@ -0,0 +1,2 @@ + +export {}; diff --git a/test/util.ts b/test/util.ts new file mode 100644 index 0000000..706dc41 --- /dev/null +++ b/test/util.ts @@ -0,0 +1,6 @@ +import * as ic from '@libp2p/interface-connection' +import { createEd25519PeerId } from '@libp2p/peer-id-factory'; + +export async function createConnection(pc: RTCPeerConnection, direction: ic.Direction) { + let peerId = await createEd25519PeerId(); +} From cac0f86f38b23916cf2d7ebd32974dcb833e9ad8 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Thu, 11 Aug 2022 22:39:46 +0530 Subject: [PATCH 3/5] tests --- package.json | 9 ++- src/connection.ts | 15 ++++- src/stream.ts | 19 ++++-- src/transport.ts | 15 ++++- test/connection.browser.spec.ts | 49 ++++++++++++++ test/connection.spec.ts | 2 - test/util.ts | 110 +++++++++++++++++++++++++++++++- 7 files changed, 202 insertions(+), 17 deletions(-) create mode 100644 test/connection.browser.spec.ts delete mode 100644 test/connection.spec.ts diff --git a/package.json b/package.json index 844f3bb..ce9a6ff 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "scripts": { "build": "aegir build", "clean": "rm -rfv node_modules dist *.lock *-lock.json ", - "test": "aegir test", + "test": "aegir test --target browser", "format": "prettier --write src/*.ts" }, "devDependencies": { @@ -17,11 +17,14 @@ "@types/uuid": "^8.3.4", "@typescript-eslint/parser": "^5.32.0", "aegir": "^37.4.6", + "it-all": "^1.0.6", + "it-first": "^1.0.7", "prettier": "^2.7.1", - "typescript": "^4.7.4" + "typescript": "^4.7.4", + "uint8arrays": "^3.1.0" }, "dependencies": { - "@chainsafe/libp2p-noise": "git://github.com/ChainSafe/js-libp2p-noise.git#15f7a6e700a69c9a40abb82d989a55032d5cf687", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/components": "^2.0.3", "@libp2p/interface-connection": "^3.0.1", "@libp2p/interface-registrar": "^2.0.3", diff --git a/src/connection.ts b/src/connection.ts index ff2204f..feb996f 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -64,12 +64,14 @@ export class WebRTCConnection implements ic.Connection { private handleIncomingStreams() { let metrics = this.components.getMetrics(); this.peerConnection.ondatachannel = async ({ channel }) => { + const logPrefix = `[stream:${channel.label}][inbound]`; + log.trace(`incoming stream - ${channel.label}`); let [openPromise, abortPromise] = [defer(), defer()]; let controller = new TimeoutController(OPEN_STREAM_TIMEOUT); controller.signal.onabort = () => abortPromise.resolve(); channel.onopen = () => openPromise.resolve(); - await Promise.race([openPromise, abortPromise]); + await Promise.race([openPromise.promise, abortPromise.promise]); if (controller.signal.aborted) { // TODO: Better errors throw Error(controller.signal.reason); @@ -87,11 +89,15 @@ export class WebRTCConnection implements ic.Connection { let registrar = this.components.getRegistrar(); let protocols = registrar.getProtocols(); + log.trace(`${logPrefix} supported protocols - ${protocols}`); + let { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal }); if (metrics) { metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer }); } + log.trace(`${logPrefix} handled protocol - ${protocol}`); + rawStream.stat.protocol = protocol; let result = this.wrapMsStream(rawStream, stream); @@ -139,7 +145,7 @@ export class WebRTCConnection implements ic.Connection { } async newStream(protocols: string | string[], options: AbortOptions = {}): Promise { - let label = genUuid(); + let label = genUuid().slice(0, 8); let openPromise = defer(); let abortedPromise = defer(); let controller: TimeoutController | undefined; @@ -161,8 +167,10 @@ export class WebRTCConnection implements ic.Connection { abortedPromise.resolve(); }; + log.trace(`[stream: ${label}] peerconnection state: ${this.peerConnection.connectionState}`); let channel = this.peerConnection.createDataChannel(label); channel.onopen = (_evt) => { + log.trace(`[stream: ${label}] data channel opened`); openPromise.resolve(); }; channel.onerror = (_evt) => { @@ -171,7 +179,8 @@ export class WebRTCConnection implements ic.Connection { abortedPromise.resolve(); }; - await Promise.race([openPromise, abortedPromise]); + log.trace(`[stream: ${label}] datachannel state: ${channel.readyState}`); + await Promise.race([openPromise.promise, abortedPromise.promise]); // check for error if (openError) { diff --git a/src/stream.ts b/src/stream.ts index ea9d114..3fd717f 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,14 +1,15 @@ import { Stream } from '@libp2p/interface-connection'; import { StreamStat } from '@libp2p/interface-connection'; -// import { logger } from '@libp2p/logger'; import { Source } from 'it-stream-types'; import { Sink } from 'it-stream-types'; import { pushable, Pushable } from 'it-pushable'; import defer, { DeferredPromise } from 'p-defer'; import merge from 'it-merge'; import { Uint8ArrayList } from 'uint8arraylist'; +import { fromString } from 'uint8arrays/from-string'; +import { logger } from '@libp2p/logger'; -// const log = logger('libp2p:webrtc:connection'); +const log = logger('libp2p:webrtc:stream'); type StreamInitOpts = { channel: RTCDataChannel; @@ -43,6 +44,8 @@ export class WebRTCStream implements Stream { readClosed: boolean = false; closed: boolean = false; + // testing + constructor(opts: StreamInitOpts) { this.channel = opts.channel; this.id = this.channel.label; @@ -77,7 +80,15 @@ export class WebRTCStream implements Stream { if (this.readClosed || this.closed) { return; } - (this.source as Pushable).push(data); + + let res: Uint8Array; + if (typeof data == 'string') { + res = fromString(data); + } else { + res = new Uint8Array(data as ArrayBuffer); + } + log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${res.length} ${res}`); + (this.source as Pushable).push(new Uint8ArrayList(res)); }; this.channel.onclose = (_evt) => { @@ -131,7 +142,7 @@ export class WebRTCStream implements Stream { */ closeRead(): void { this.readClosed = true; - (this.source as Pushable).end(); + (this.source as Pushable).end(); if (this.readClosed && this.writeClosed) { this.close(); } diff --git a/src/transport.ts b/src/transport.ts index 9525260..2fb137b 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -96,8 +96,19 @@ export class WebRTCTransport implements Transport, Initializable { // is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order. let fingerprintsPrologue = [myPeerId.multihash, theirPeerId.multihash].sort().join(''); let noise = new Noise(myPeerId.privateKey, undefined, stablelib, utf8.encode(fingerprintsPrologue)); - let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'outbound', timeline: { open: 0 } } }); - await noise.secureOutbound(myPeerId, wrappedChannel, theirPeerId); + let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'outbound', timeline: { open: 1 } } }); + let wrappedDuplex = { + ...wrappedChannel, + source: { + [Symbol.asyncIterator]: async function* () { + for await (const list of wrappedChannel.source) { + yield list.subarray(); + } + }, + }, + }; + + await noise.secureOutbound(myPeerId, wrappedDuplex, theirPeerId); return new WebRTCConnection({ components: this.components!, diff --git a/test/connection.browser.spec.ts b/test/connection.browser.spec.ts new file mode 100644 index 0000000..9b27227 --- /dev/null +++ b/test/connection.browser.spec.ts @@ -0,0 +1,49 @@ +/* eslint-env mocha */ + +import {createConnectionPair, echoHandler} from "./util"; +import { expect } from 'aegir/chai'; +import { pipe } from 'it-pipe'; +import all from 'it-all'; +import first from 'it-first'; +import {fromString} from 'uint8arrays/from-string'; +import {v4} from 'uuid'; + +const echoProtocol = "/echo/1.0.0" + +describe('connection browser tests', () => { + it('can run the echo protocol (first)', async () => { + let [{ connection: client }, server] = await createConnectionPair(); + let serverRegistrar = server.registrar; + await serverRegistrar.handle(echoProtocol, echoHandler, { maxInboundStreams: 10, maxOutboundStreams: 10 }) + let clientStream = await client.newStream([echoProtocol]); + let data = fromString(v4()); + let response = await pipe( + [data], + clientStream, + async (source) => await first(source), + ); + + expect(response).to.not.be.undefined; + expect(response!.subarray()).to.equalBytes(data); + }); + + it('can run the echo protocol (all)', async () => { + let [{ connection: client }, server] = await createConnectionPair(); + let serverRegistrar = server.registrar; + await serverRegistrar.handle(echoProtocol, echoHandler, { maxInboundStreams: 10, maxOutboundStreams: 10 }) + let clientStream = await client.newStream([echoProtocol]); + // close stream after 2 seconds + setTimeout(() => clientStream.close(), 2000); + let data = fromString(v4()); + let response = await pipe( + [data], + clientStream, + async (source) => await all(source), + ); + + expect(response).to.not.be.undefined; + expect(response![0].subarray()).to.equalBytes(data); + }); +}); + +export {}; diff --git a/test/connection.spec.ts b/test/connection.spec.ts deleted file mode 100644 index a88f9a3..0000000 --- a/test/connection.spec.ts +++ /dev/null @@ -1,2 +0,0 @@ - -export {}; diff --git a/test/util.ts b/test/util.ts index 706dc41..ef8b3ea 100644 --- a/test/util.ts +++ b/test/util.ts @@ -1,6 +1,110 @@ import * as ic from '@libp2p/interface-connection' -import { createEd25519PeerId } from '@libp2p/peer-id-factory'; +import {createEd25519PeerId} from '@libp2p/peer-id-factory'; +import {mockRegistrar, mockUpgrader} from '@libp2p/interface-mocks'; +import {Components} from '@libp2p/components'; +import defer, {DeferredPromise} from 'p-defer'; +import {WebRTCConnection} from '../src/connection'; +import {Multiaddr} from '@multiformats/multiaddr'; +import {v4} from 'uuid'; +import {Registrar, StreamHandler} from '@libp2p/interface-registrar'; +import { pipe } from 'it-pipe'; +import { logger } from '@libp2p/logger'; -export async function createConnection(pc: RTCPeerConnection, direction: ic.Direction) { - let peerId = await createEd25519PeerId(); +const log = logger('libp2p:webrtc:test:util'); + +export const echoHandler: StreamHandler = ({ stream }) => pipe(stream.source, stream.sink); + +export async function createConnectedRTCPeerConnectionPair(): Promise { + let [client, server] = [new RTCPeerConnection(), new RTCPeerConnection()]; + log('created peer connections'); + // we don't need auth for a local test but we need a component for candidate gathering + client.createDataChannel('data'); + client.onicecandidate = ({candidate}) => { + if (candidate !== null) { + server.addIceCandidate(candidate); + } + }; + server.onicecandidate = ({candidate}) => { + if (candidate !== null) { + client.addIceCandidate(candidate); + } + }; + let resolveOnConnect = (pc: RTCPeerConnection): DeferredPromise => { + let promise: DeferredPromise = defer(); + pc.onconnectionstatechange = (_evt) => { + switch (pc.connectionState) { + case 'connected': + log.trace('pc connected'); + promise.resolve(); + return; + case 'failed': + case 'disconnected': + promise.reject(); + return; + } + }; + return promise; + } + + let clientConnected = resolveOnConnect(client); + let serverConnected = resolveOnConnect(server); + log('set callbacks on peerconnections'); + + let clientOffer = await client.createOffer(); + await client.setLocalDescription(clientOffer); + await server.setRemoteDescription(clientOffer); + let serverAnswer = await server.createAnswer(); + await server.setLocalDescription(serverAnswer); + await client.setRemoteDescription(serverAnswer); + log('completed sdp exchange'); + + await Promise.all([clientConnected.promise, serverConnected.promise]) + + log.trace(`clientstate: ${client.connectionState}, serverstate: ${server.connectionState}`) + + // let dc = client.createDataChannel('test'); + // log.trace('awaiting test datachannel opening'); + // await new Promise((res) => { + // dc.onopen = () => res(); + // }); + + log('created peer connections'); + return [client, server]; +} + +export async function createConnectionPair(): Promise<{ connection: ic.Connection, registrar: Registrar }[]> { + let [clientPeerId, serverPeerId] = await Promise.all([createEd25519PeerId(), createEd25519PeerId()]); + let [clientRegistrar, serverRegistrar] = [mockRegistrar(), mockRegistrar()]; + let upgrader = mockUpgrader(); + let [client, server] = await createConnectedRTCPeerConnectionPair(); + let clientConnection = new WebRTCConnection({ + id: v4(), + pc: client, + localPeer: clientPeerId, + remotePeer: serverPeerId, + remoteAddr: new Multiaddr(), + components: new Components({ + peerId: clientPeerId, + registrar: clientRegistrar, + upgrader: upgrader, + }), + direction: 'outbound', + }); + let serverConnection = new WebRTCConnection({ + id: v4(), + pc: server, + localPeer: serverPeerId, + remotePeer: clientPeerId, + remoteAddr: new Multiaddr(), + components: new Components({ + peerId: serverPeerId, + registrar: serverRegistrar, + upgrader: upgrader, + }), + direction: 'inbound', + }); + return [ + { connection: clientConnection, registrar: clientRegistrar }, + { connection: serverConnection, registrar: serverRegistrar }, + ]; } From 529df965083a8e244477abecb65f2f79fa88d499 Mon Sep 17 00:00:00 2001 From: John Turpish Date: Thu, 11 Aug 2022 13:14:32 -0400 Subject: [PATCH 4/5] about to merge --- package.json | 5 +++-- src/transport.ts | 12 +++++++++++- test/browser.ts | 12 ++++++++++++ test/node.js | 6 ------ test/stream.spec.ts | 12 ------------ 5 files changed, 26 insertions(+), 21 deletions(-) create mode 100644 test/browser.ts delete mode 100644 test/node.js delete mode 100644 test/stream.spec.ts diff --git a/package.json b/package.json index a096f18..83672aa 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "build": "aegir build", "clean": "rm -rfv node_modules dist *.lock *-lock.json ", "test": "aegir test", + "test:browser": "aegir test --target browser", "format": "prettier --write src/*.ts" }, "devDependencies": { @@ -21,9 +22,9 @@ "typescript": "^4.7.4" }, "dependencies": { - "@chainsafe/libp2p-noise": "git://github.com/ChainSafe/js-libp2p-noise.git#15f7a6e700a69c9a40abb82d989a55032d5cf687", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/components": "^2.0.3", - "@libp2p/interface-connection": "^3.0.0", + "@libp2p/interface-connection": "^3.0.1", "@libp2p/interface-transport": "^1.0.3", "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", diff --git a/src/transport.ts b/src/transport.ts index e26630f..d43c7ef 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -99,7 +99,17 @@ export class WebRTCTransport implements Transport, Initializable { let fingerprintsPrologue = [myPeerId.multihash, theirPeerId.multihash].sort().join(''); let noise = new Noise(myPeerId.privateKey, undefined, stablelib, utf8.encode(fingerprintsPrologue)); let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'outbound', timeline: { open: 0 } } }); - await noise.secureOutbound(myPeerId, wrappedChannel, theirPeerId); + let wrappedDuplex = { + ...wrappedChannel, + source: { + [Symbol.asyncIterator]: async function* () { + for await (const list of wrappedChannel.source) { + yield list.subarray(); + } + }, + }, + }; + await noise.secureOutbound(myPeerId, wrappedDuplex, theirPeerId); return new WebRTCConnection({ id: ma.toString(), diff --git a/test/browser.ts b/test/browser.ts new file mode 100644 index 0000000..9e12829 --- /dev/null +++ b/test/browser.ts @@ -0,0 +1,12 @@ +import { expect } from 'chai'; +import * as underTest from '../src/stream.js'; + +describe('stream stats', () => { + it('can construct', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc }); + expect(s.stat.timeline.close).to.be.null; + expect(9).to.equal(3); + }); +}); diff --git a/test/node.js b/test/node.js deleted file mode 100644 index 5c78a05..0000000 --- a/test/node.js +++ /dev/null @@ -1,6 +0,0 @@ -/* eslint-env mocha */ - -export {} - -describe('noop', () => { -}) diff --git a/test/stream.spec.ts b/test/stream.spec.ts deleted file mode 100644 index b041654..0000000 --- a/test/stream.spec.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { expect } from 'chai'; -import * as underTest from '../src/stream.js'; -// import { RTCPeerConnection } from 'rtc'; - -describe('stream stats', () => { - it('can construct', () => { - // let pc = new RTCPeerConnection(); - // let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: {} }); - expect(s.stat.timeline.close).to.be.null; - }); -}); From fd56251ddef03c1d0374cc49bb16f7cede891d10 Mon Sep 17 00:00:00 2001 From: John Turpish Date: Thu, 11 Aug 2022 14:53:54 -0400 Subject: [PATCH 5/5] Every public method covered with a test case --- src/stream.ts | 14 +++++- test/browser.ts | 12 ----- test/stream.browser.spec.ts | 97 +++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 14 deletions(-) delete mode 100644 test/browser.ts create mode 100644 test/stream.browser.spec.ts diff --git a/src/stream.ts b/src/stream.ts index 3fd717f..f0b2f8d 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,5 +1,4 @@ -import { Stream } from '@libp2p/interface-connection'; -import { StreamStat } from '@libp2p/interface-connection'; +import { Stream, StreamStat, Direction } from '@libp2p/interface-connection'; import { Source } from 'it-stream-types'; import { Sink } from 'it-stream-types'; import { pushable, Pushable } from 'it-pushable'; @@ -11,6 +10,16 @@ import { logger } from '@libp2p/logger'; const log = logger('libp2p:webrtc:stream'); +export function defaultStat(dir: Direction): StreamStat { + return { + direction: dir, + timeline: { + open: 0, + close: undefined, + }, + }; +} + type StreamInitOpts = { channel: RTCDataChannel; metadata?: Record; @@ -171,5 +180,6 @@ export class WebRTCStream implements Stream { */ reset(): void { this.close(); + this.stat = defaultStat(this.stat.direction); } } diff --git a/test/browser.ts b/test/browser.ts deleted file mode 100644 index 9e12829..0000000 --- a/test/browser.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { expect } from 'chai'; -import * as underTest from '../src/stream.js'; - -describe('stream stats', () => { - it('can construct', () => { - let pc = new RTCPeerConnection(); - let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); - let s = new underTest.WebRTCStream({ channel: dc }); - expect(s.stat.timeline.close).to.be.null; - expect(9).to.equal(3); - }); -}); diff --git a/test/stream.browser.spec.ts b/test/stream.browser.spec.ts new file mode 100644 index 0000000..af6d48e --- /dev/null +++ b/test/stream.browser.spec.ts @@ -0,0 +1,97 @@ +import { expect } from 'chai'; +import * as underTest from '../src/stream.js'; + +describe('stream stats', () => { + it('can construct', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.stat.timeline.close).to.not.exist(); + }); + + it('close marks it closed', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(false); + expect(s.stat.timeline.close).to.not.exist(); + s.close(); + expect(s.closed).to.equal(true); + expect(s.readClosed).to.equal(true); + expect(s.writeClosed).to.equal(true); + expect(s.stat.timeline.close).to.exist(); + }); + + it('closeRead marks it read-closed only', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(false); + s.closeRead(); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(true); + expect(s.writeClosed).to.equal(false); + }); + + it('closeWrite marks it write-closed only', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(false); + s.closeWrite(); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(true); + }); + + it('closeWrite AND closeRead = close', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(false); + s.closeRead(); + s.closeWrite(); + expect(s.closed).to.equal(true); + expect(s.readClosed).to.equal(true); + expect(s.writeClosed).to.equal(true); + }); + + it('abort = close', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(false); + expect(s.stat.timeline.close).to.not.exist(); + s.abort({ name: 'irrelevant', message: 'this parameter is actually ignored' }); + expect(s.closed).to.equal(true); + expect(s.readClosed).to.equal(true); + expect(s.writeClosed).to.equal(true); + expect(s.stat.timeline.close).to.exist(); + expect(s.stat.timeline.close).to.be.greaterThan(s.stat.timeline.open); + }); + + it('reset = close + newStat', () => { + let pc = new RTCPeerConnection(); + let dc = pc.createDataChannel('whatever', { negotiated: true, id: 91 }); + let s = new underTest.WebRTCStream({ channel: dc, stat: underTest.defaultStat('outbound') }); + expect(s.closed).to.equal(false); + expect(s.readClosed).to.equal(false); + expect(s.writeClosed).to.equal(false); + expect(s.stat.timeline.close).to.not.exist(); + s.reset(); + expect(s.closed).to.equal(true); + expect(s.readClosed).to.equal(true); + expect(s.writeClosed).to.equal(true); + expect(s.stat.timeline.close).to.not.exist(); + }); +});