Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #15 from little-bear-labs/jt/con-439_streamTest
Browse files Browse the repository at this point in the history
Unit testing Stream
  • Loading branch information
John-LittleBearLabs committed Aug 12, 2022
2 parents da01296 + 819d243 commit 1ab83be
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 56 deletions.
14 changes: 11 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,36 @@
"scripts": {
"build": "aegir build",
"test": "aegir test",
"test:browser": "aegir test --target browser",
"format": "prettier --write src/*.ts"
},
"devDependencies": {
"@libp2p/interface-mocks": "^4.0.1",
"@types/uuid": "^8.3.4",
"@typescript-eslint/parser": "^5.32.0",
"aegir": "^37.4.6",
"it-all": "^1.0.6",
"it-first": "^1.0.7",
"prettier": "^2.7.1",
"typescript": "^4.7.4"
"typescript": "^4.7.4",
"uint8arrays": "^3.1.0"
},
"dependencies": {
"@chainsafe/libp2p-noise": "git://github.com/ChainSafe/js-libp2p-noise.git#15f7a6e700a69c9a40abb82d989a55032d5cf687",
"@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/components": "^2.0.3",
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interface-transport": "^1.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"@libp2p/multistream-select": "^3.0.0",
"@libp2p/peer-id": "^1.1.15",
"@multiformats/multiaddr": "../js-multiaddr/",
"abortable-iterator": "^4.0.2",
"it-merge": "^1.0.4",
"p-defer": "^4.0.0",
"socket.io-client": "^4.1.2",
"timeout-abort-controller": "^3.0.0",
"uuid": "^8.3.2"
}
}
221 changes: 193 additions & 28 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,70 +4,235 @@ import { AbortOptions } from '@libp2p/interfaces';
import { logger } from '@libp2p/logger';
import { Multiaddr } from '@multiformats/multiaddr';
import { v4 as genUuid } from 'uuid';
import { Components } from '@libp2p/components';
import defer from 'p-defer';
import { TimeoutController } from 'timeout-abort-controller';
import { WebRTCStream } from './stream';
import { select as msselect, handle as mshandle } from '@libp2p/multistream-select';
import { Duplex } from 'it-stream-types';
import { Uint8ArrayList } from 'uint8arraylist';

const log = logger('libp2p:webrtc:connection');

type ConnectionInit = {
components: Components;
id: string;
localPeer: PeerId;
localAddr?: Multiaddr;
remoteAddr: Multiaddr;
remotePeer: PeerId;
direction: ic.Direction;
tags?: string[];
pc: RTCPeerConnection;
credential_string: string;
remotePeerId: PeerId;
};

const DEFAULT_MAX_INBOUND_STREAMS = 32;
const DEFAULT_MAX_OUTBOUND_STREAMS = 64;
const OPEN_STREAM_TIMEOUT = 30_000;

export class WebRTCConnection implements ic.Connection {
id: string;
stat: ic.ConnectionStat;
localPeer: PeerId;
localAddr?: Multiaddr;
remoteAddr: Multiaddr;
remotePeer: PeerId;
tags: string[] = [];
streams: ic.Stream[] = [];
direction: ic.Direction;
components: Components;

private _streams: Map<string, ic.Stream> = new Map();
private peerConnection: RTCPeerConnection;
private ufrag: string;

constructor(init: ConnectionInit) {
this.streams = [];
this.remoteAddr = init.remoteAddr;
this.id = init.id;
this.direction = init.direction;
this.peerConnection = init.pc;
this.ufrag = init.credential_string;
this.remotePeer = init.remotePeer;
this.localPeer = init.localPeer;
this.localAddr = init.localAddr;
this.components = init.components;
this.stat = {
direction: 'outbound',
timeline: { open: 0 },
status: 'CLOSED',
direction: init.direction,
status: 'OPEN',
timeline: {
open: new Date().getTime(),
},
};
this.handleIncomingStreams();
}

private handleIncomingStreams() {
let metrics = this.components.getMetrics();
this.peerConnection.ondatachannel = async ({ channel }) => {
const logPrefix = `[stream:${channel.label}][inbound]`;
log.trace(`incoming stream - ${channel.label}`);
let [openPromise, abortPromise] = [defer(), defer()];
let controller = new TimeoutController(OPEN_STREAM_TIMEOUT);
controller.signal.onabort = () => abortPromise.resolve();
channel.onopen = () => openPromise.resolve();

await Promise.race([openPromise.promise, abortPromise.promise]);
if (controller.signal.aborted) {
// TODO: Better errors
throw Error(controller.signal.reason);
}

let rawStream = new WebRTCStream({
channel,
stat: {
direction: 'inbound',
timeline: {
open: new Date().getTime(),
},
},
});
let registrar = this.components.getRegistrar();
let protocols = registrar.getProtocols();

log.trace(`${logPrefix} supported protocols - ${protocols}`);

let { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal });
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
}

log.trace(`${logPrefix} handled protocol - ${protocol}`);

rawStream.stat.protocol = protocol;
let result = this.wrapMsStream(rawStream, stream);

this.addStream(result);

// handle stream
let { handler } = registrar.getHandler(protocol);
handler({ connection: this, stream: result });
};
this.remotePeer = init.remotePeerId;
// for muxing incoming stream
// this._peerConnection.ondatachannel = ({ channel }) => {
// let stream = DataChannelStream(channel)
// this.addStream(stream)
// }
}

async newStream(multicodecs: string | string[], options?: AbortOptions): Promise<ic.Stream> {
// let label = uuid.v4()
// let dc = this._peerConnection.createDataChannel(label, {})
// await datachannel opening
// return DataChannelStream(dc)
log('TODO', this.ufrag);
this.peerConnection.createDataChannel(genUuid());
throw new Error('not implemented');
private wrapMsStream(rawStream: WebRTCStream, stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array, Promise<void>>): ic.Stream {
return {
...stream,
close: () => {
rawStream.close();
},
closeRead: () => {
rawStream.closeRead();
},
closeWrite: () => {
rawStream.closeWrite();
},
abort: (err) => {
rawStream.abort(err);
},
reset: () => rawStream.reset(),
id: rawStream.id,
metadata: rawStream.metadata,
stat: rawStream.stat,
};
}

private findStreamLimit(protocol: string, direction: ic.Direction): number {
let registrar = this.components.getRegistrar();
try {
let handler = registrar.getHandler(protocol);
return direction === 'inbound' ? handler.options.maxInboundStreams || DEFAULT_MAX_INBOUND_STREAMS : handler.options.maxOutboundStreams || DEFAULT_MAX_OUTBOUND_STREAMS;
} catch (err) {}
return direction === 'inbound' ? DEFAULT_MAX_INBOUND_STREAMS : DEFAULT_MAX_OUTBOUND_STREAMS;
}

private countStream(protocol: string, direction: ic.Direction): number {
return this.streams.filter((s) => s.stat.protocol === protocol && s.stat.direction === direction).length;
}

async newStream(protocols: string | string[], options: AbortOptions = {}): Promise<ic.Stream> {
let label = genUuid().slice(0, 8);
let openPromise = defer();
let abortedPromise = defer();
let controller: TimeoutController | undefined;
let metrics = this.components.getMetrics();
let openError: Error | undefined;

log.trace(`opening new stream with protocols: ${protocols}`);

// timeout in case no abort options are provided
if (options.signal == null) {
log.trace(`[stream: ${label}] no abort signal provided, creating timeout controller`);
controller = new TimeoutController(OPEN_STREAM_TIMEOUT);
options.signal = controller.signal;
}

options.signal.onabort = () => {
log.trace(`[stream: ${label}] abort called - ${options.signal?.reason}`);
openError = new Error(options.signal?.reason || 'aborted');
abortedPromise.resolve();
};

log.trace(`[stream: ${label}] peerconnection state: ${this.peerConnection.connectionState}`);
let channel = this.peerConnection.createDataChannel(label);
channel.onopen = (_evt) => {
log.trace(`[stream: ${label}] data channel opened`);
openPromise.resolve();
};
channel.onerror = (_evt) => {
log.trace(`[stream: ${label}] data channel error: ${(_evt as RTCErrorEvent).error}`);
openError = new Error(`data channel error`);
abortedPromise.resolve();
};

log.trace(`[stream: ${label}] datachannel state: ${channel.readyState}`);
await Promise.race([openPromise.promise, abortedPromise.promise]);

// check for error
if (openError) {
// TODO: Better errors
throw openError;
}

let rawStream = new WebRTCStream({
channel,
stat: {
direction: 'outbound',
timeline: {
open: new Date().getTime(),
},
},
});

let { stream, protocol } = await msselect(rawStream, protocols, { signal: options.signal });
log.trace(`[stream ${label}] select protocol - ${protocol}`);
// check if stream is within limit after protocol has been negotiated
rawStream.stat.protocol = protocol;
let result = this.wrapMsStream(rawStream, stream);
// check if stream can be accomodated
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
}

this.addStream(result);
return result;
}

addStream(stream: ic.Stream): void {
throw new Error('not implemented');
let protocol = stream.stat.protocol!;
let direction = stream.stat.direction;
if (this.countStream(protocol, direction) === this.findStreamLimit(protocol, direction)) {
log(`${direction} stream limit reached for protocol - ${protocol}`);
let err = new Error(`${direction} stream limit reached for protocol - ${protocol}`);
stream.abort(err);
throw err;
}
this._streams.set(stream.id, stream);
}

removeStream(id: string): void {
throw new Error('not implemented');
this._streams.delete(id);
}

get streams(): ic.Stream[] {
return Array.from(this._streams.values());
}

async close(): Promise<void> {
throw new Error('not implemented');
this.peerConnection.close();
}
}
37 changes: 29 additions & 8 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import { Stream } from '@libp2p/interface-connection';
import { StreamStat } from '@libp2p/interface-connection';
// import { logger } from '@libp2p/logger';
import { Stream, StreamStat, Direction } from '@libp2p/interface-connection';
import { Source } from 'it-stream-types';
import { Sink } from 'it-stream-types';
import { pushable, Pushable } from 'it-pushable';
import defer, { DeferredPromise } from 'p-defer';
import merge from 'it-merge';
import { Uint8ArrayList } from 'uint8arraylist'

// const log = logger('libp2p:webrtc:connection');
import { Uint8ArrayList } from 'uint8arraylist';
import { fromString } from 'uint8arrays/from-string';
import { logger } from '@libp2p/logger';

const log = logger('libp2p:webrtc:stream');

export function defaultStat(dir: Direction): StreamStat {
return {
direction: dir,
timeline: {
open: 0,
close: undefined,
},
};
}

type StreamInitOpts = {
channel: RTCDataChannel;
Expand Down Expand Up @@ -43,6 +53,8 @@ export class WebRTCStream implements Stream {
readClosed: boolean = false;
closed: boolean = false;

// testing

constructor(opts: StreamInitOpts) {
this.channel = opts.channel;
this.id = this.channel.label;
Expand Down Expand Up @@ -77,7 +89,15 @@ export class WebRTCStream implements Stream {
if (this.readClosed || this.closed) {
return;
}
(this.source as Pushable<Uint8Array>).push(data);

let res: Uint8Array;
if (typeof data == 'string') {
res = fromString(data);
} else {
res = new Uint8Array(data as ArrayBuffer);
}
log.trace(`[stream:${this.id}][${this.stat.direction}] received message: length: ${res.length} ${res}`);
(this.source as Pushable<Uint8ArrayList>).push(new Uint8ArrayList(res));
};

this.channel.onclose = (_evt) => {
Expand Down Expand Up @@ -131,7 +151,7 @@ export class WebRTCStream implements Stream {
*/
closeRead(): void {
this.readClosed = true;
(this.source as Pushable<Uint8Array>).end();
(this.source as Pushable<Uint8ArrayList>).end();
if (this.readClosed && this.writeClosed) {
this.close();
}
Expand Down Expand Up @@ -160,5 +180,6 @@ export class WebRTCStream implements Stream {
*/
reset(): void {
this.close();
this.stat = defaultStat(this.stat.direction);
}
}
Loading

0 comments on commit 1ab83be

Please sign in to comment.