Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Initial pass at Transport._connect #6

Merged
merged 9 commits into from
Aug 10, 2022
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"type": "module",
"scripts": {
"build": "aegir build",
"clean": "rm -rfv node_modules dist *.lock *-lock.json ",
"test": "aegir test",
"format": "prettier --write src/*.ts"
},
Expand All @@ -19,10 +20,13 @@
"typescript": "^4.7.4"
},
"dependencies": {
"@chainsafe/libp2p-noise": "../js-libp2p-noise",
"@libp2p/components": "^2.0.1",
"@libp2p/interface-transport": "^1.0.2",
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-transport": "^1.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-id": "^1.1.15",
"abortable-iterator": "^4.0.2",
"it-merge": "^1.0.4",
"p-defer": "^4.0.0",
Expand Down
29 changes: 15 additions & 14 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { Connection } from '@libp2p/interface-connection';
import { ConnectionStat } from '@libp2p/interface-connection';
import { Stream, Direction } from '@libp2p/interface-connection';
import * as ic from '@libp2p/interface-connection';
import { PeerId } from '@libp2p/interface-peer-id';
import { AbortOptions } from '@libp2p/interfaces';
import { logger } from '@libp2p/logger';
Expand All @@ -13,44 +11,47 @@ type ConnectionInit = {
id: string;
localPeer: PeerId;
localAddr?: Multiaddr;
remotePeer: PeerId;
remoteAddr: Multiaddr;
direction: Direction;
direction: ic.Direction;
tags?: string[];
stat: ConnectionStat;
pc: RTCPeerConnection;
credential_string: string;
remotePeerId: PeerId;
};

export class WebRTCConnection implements Connection {
export class WebRTCConnection implements ic.Connection {
id: string;
stat: ConnectionStat;
stat: ic.ConnectionStat;
remoteAddr: Multiaddr;
remotePeer: PeerId;
tags: string[] = [];
streams: Stream[] = [];
direction: Direction;
streams: ic.Stream[] = [];
direction: ic.Direction;

private peerConnection: RTCPeerConnection;
private ufrag: string;

constructor(init: ConnectionInit) {
this.streams = [];
this.remotePeer = init.remotePeer;
this.remoteAddr = init.remoteAddr;
this.stat = init.stat;
this.id = init.id;
this.direction = init.direction;
this.peerConnection = init.pc;
this.ufrag = init.credential_string;
this.stat = {
direction: 'outbound',
timeline: { open: 0 },
status: 'CLOSED',
};
this.remotePeer = init.remotePeerId;
// for muxing incoming stream
// this._peerConnection.ondatachannel = ({ channel }) => {
// let stream = DataChannelStream(channel)
// this.addStream(stream)
// }
}

async newStream(multicodecs: string | string[], options?: AbortOptions): Promise<Stream> {
async newStream(multicodecs: string | string[], options?: AbortOptions): Promise<ic.Stream> {
// let label = uuid.v4()
// let dc = this._peerConnection.createDataChannel(label, {})
// await datachannel opening
Expand All @@ -60,7 +61,7 @@ export class WebRTCConnection implements Connection {
throw new Error('not implemented');
}

addStream(stream: Stream): void {
addStream(stream: ic.Stream): void {
throw new Error('not implemented');
}
removeStream(id: string): void {
Expand Down
7 changes: 4 additions & 3 deletions src/sdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ export function fromMultiAddr(ma: Multiaddr, ufrag: string): RTCSessionDescripti
};
}

export function munge(desc: RTCSessionDescription, ufrag: string) {
export function munge(desc: RTCSessionDescriptionInit, ufrag: string): RTCSessionDescriptionInit {
//TODO
desc.sdp.replaceAll(/^a=ice-ufrag=(.*)/, 'a=ice-ufrag=' + ufrag);
desc.sdp.replaceAll(/^a=ice-pwd=(.*)/, 'a=ice-pwd=' + ufrag);
// desc.sdp.replaceAll(/^a=ice-ufrag=(.*)/, 'a=ice-ufrag=' + ufrag);
// desc.sdp.replaceAll(/^a=ice-pwd=(.*)/, 'a=ice-pwd=' + ufrag);
return desc;
}
9 changes: 5 additions & 4 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Sink } from 'it-stream-types';
import { pushable, Pushable } from 'it-pushable';
import defer, { DeferredPromise } from 'p-defer';
import merge from 'it-merge';
import { Uint8ArrayList } from 'uint8arraylist'

// const log = logger('libp2p:webrtc:connection');

Expand All @@ -32,8 +33,8 @@ export class WebRTCStream implements Stream {
metadata: Record<string, any>;
private readonly channel: RTCDataChannel;

source: Source<Uint8Array> = pushable();
sink: Sink<Uint8Array, Promise<void>>;
source: Source<Uint8ArrayList> = pushable();
sink: Sink<Uint8ArrayList | Uint8Array, Promise<void>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ckousik - please look at these changes, to be in compliance with the latest Stream interface

src/stream.ts:36:3 - error TS2416: Property 'sink' in type 'WebRTCStream' is not assignable to the same property in base type 'Stream'.
  Type 'Sink<Uint8Array, Promise<void>>' is not assignable to type 'Sink<Uint8ArrayList | Uint8Array, Promise<void>>'.
    Type 'Uint8ArrayList | Uint8Array' is not assignable to type 'Uint8Array'.
      Type 'Uint8ArrayList' is missing the following properties from type 'Uint8Array': BYTES_PER_ELEMENT, buffer, byteOffset, copyWithin, and 21 more.

36   sink: Sink<Uint8Array, Promise<void>>;


// promises
opened: DeferredPromise<void> = defer();
Expand Down Expand Up @@ -89,7 +90,7 @@ export class WebRTCStream implements Stream {
};
}

private async _sinkFn(src: Source<Uint8Array>): Promise<void> {
private async _sinkFn(src: Source<Uint8ArrayList | Uint8Array>): Promise<void> {
await this.opened.promise;
if (closed || this.writeClosed) {
return;
Expand All @@ -107,7 +108,7 @@ export class WebRTCStream implements Stream {
if (closed || this.writeClosed) {
break;
}
this.channel.send(buf);
this.channel.send(buf.subarray());
}
}

Expand Down
73 changes: 50 additions & 23 deletions src/transport.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import * as sdp from './sdp';
import * as p from '@libp2p/peer-id';
import { WebRTCConnection } from './connection';
import { WebRTCDialOptions } from './options';
import { WebRTCStream } from './stream';
import { Noise, stablelib } from '@chainsafe/libp2p-noise';
import { Components } from '@libp2p/components';
import { Connection } from '@libp2p/interface-connection';
import { CreateListenerOptions } from '@libp2p/interface-transport';
import { Listener, Transport } from '@libp2p/interface-transport';
import { DialOptions, symbol } from '@libp2p/interface-transport';
import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport';
import { logger } from '@libp2p/logger';
import { Multiaddr } from '@multiformats/multiaddr';
import { v4 as genUuid } from 'uuid';
import defer from 'p-defer';

const log = logger('libp2p:webrtc:transport');
const utf8 = new TextEncoder();

export class WebRTCTransport implements Transport {
private components: Components = new Components();

async dial(ma: Multiaddr, options: DialOptions): Promise<Connection> {
const rawConn = this._connect(ma, options);
log('new outbound connection %s', rawConn, genUuid());
Expand All @@ -32,50 +40,69 @@ export class WebRTCTransport implements Transport {
return true;
}

todo_cb() {}

_connect(ma: Multiaddr, options: WebRTCDialOptions) {
//let peerConnection = new RTCPeerConnection();
async _connect(ma: Multiaddr, options: WebRTCDialOptions) {
let peerConnection = new RTCPeerConnection();
// create data channel
// let handshakeChannel = peerConnection.createDataChannel("data", { negotiated: true, id: 1 })
let handshakeDataChannel = peerConnection.createDataChannel('data', { negotiated: true, id: 1 });
// let handshakeChannel = peerConnection.createDataChannel("data", { id: 1 })
//
//
// create offer sdp
// console.log(offerSdp)
let offerSdp = await peerConnection.createOffer();
console.log(offerSdp);
//
//
// generate random string for ufrag
//
let ufrag = genUuid();
//
//
// munge sdp with ufrag = pwd
//
offerSdp = sdp.munge(offerSdp, ufrag);
//
//
// set local description
//
peerConnection.setLocalDescription(offerSdp);
//
//
// construct answer sdp from multiaddr
let answerSdp = sdp.fromMultiAddr(ma, ufrag);
//
//
//
// set remote description
peerConnection.setRemoteDescription(answerSdp);
//
//
//
// wait for peerconnection.onopen to fire, or for the datachannel to open
// openPromise = new Promise((res, rej) => {
// dc.onopen = res
// setTimeout(rej, 10000)
// })
// await openPromise
//
// do noise handshake + webrtc handshake as described in spec
//
//
// return Connection(peerconnection, initoptions)
throw new Error('not implemented');
let dataChannelOpenPromise = defer();
handshakeDataChannel.onopen = (_) => dataChannelOpenPromise.resolve();
setTimeout(dataChannelOpenPromise.reject, 10000);
await dataChannelOpenPromise;

let myPeerId = this.components.getPeerId();
let rps = ma.getPeerId();
if (!rps) {
throw new Error('TODO Do we really need a peer ID ?');
}
let theirPeerId = p.peerIdFromString(rps);

// do noise handshake
//set the Noise Prologue to libp2p-webrtc-noise:<FINGERPRINTS> before starting the actual Noise handshake.
// <FINGERPRINTS> is the concatenation of the of the two TLS fingerprints of A and B in their multihash byte representation, sorted in ascending order.
let fingerprintsPrologue = [myPeerId.multihash, theirPeerId.multihash].sort().join('');
let noise = new Noise(myPeerId.privateKey, undefined, stablelib, utf8.encode(fingerprintsPrologue));
let wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: {direction: 'outbound', timeline: {open: 0}} });
await noise.secureOutbound(myPeerId, wrappedChannel, theirPeerId);

return new WebRTCConnection({
id: ma.toString(),
remoteAddr: ma,
localPeer: myPeerId,
direction: 'outbound',
pc: peerConnection,
credential_string: ufrag,
remotePeerId: theirPeerId,
});
}
}