Skip to content

Commit

Permalink
Trying to make sense of PeerPouch.
Browse files Browse the repository at this point in the history
My idea is to use QR codes to exchange (some form of) SDP -- QR codes as signalling! :*)
  • Loading branch information
shimaore committed Nov 13, 2013
1 parent 2c63ef6 commit 14c349e
Show file tree
Hide file tree
Showing 5 changed files with 530 additions and 0 deletions.
125 changes: 125 additions & 0 deletions src/peerpouch/PeerConnectionHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
var RTCPeerConnection, RTCSessionDescription, RTCIceCandidate;

function PeerConnectionHandler_init() {
if(RTCPeerConnection) return;
RTCPeerConnection = window.mozRTCPeerConnection || window.RTCPeerConnection || window.webkitRTCPeerConnection,
RTCSessionDescription = window.mozRTCSessionDescription || window.RTCSessionDescription || window.webkitRTCSessionDescription,
RTCIceCandidate = window.mozRTCIceCandidate || window.RTCIceCandidate || window.webkitRTCIceCandidate;
}

function PeerConnectionHandler(opts) {
PeerConnectionHandler_init();
opts.reliable = true;
var cfg = (opts.cfg) ? opts.cfg : {"iceServers":[{"url":"stun:23.21.150.121"}]};
var con = (opts.reliable) ? {} : { 'optional': [{'RtpDataChannels': true }] };

this._rtc = new RTCPeerConnection(cfg, con);

this.LOG_SELF = opts._self;
this.LOG_PEER = opts._peer;
this._channel = null;

this.onhavesignal = null; // caller MUST provide this
this.onreceivemessage = null; // caller SHOULD provide this
this.onconnection = null; // …and maybe this

var handler = this, rtc = this._rtc;
if (opts.initiate)
this._setupChannel();
else
rtc.ondatachannel = this._setupChannel.bind(this);

// on negotiation needed
rtc.onnegotiationneeded = function (evt) {
if (handler.DEBUG) {
console.log(handler.LOG_SELF, "saw negotiation trigger and will create an offer");
}
rtc.createOffer(function (offerDesc) {
if (handler.DEBUG) {
console.log(handler.LOG_SELF, "created offer, sending to", handler.LOG_PEER);
}
rtc.setLocalDescription(offerDesc);
handler._sendSignal(offerDesc);
}, function (e) { console.warn(handler.LOG_SELF, "failed to create offer", e); });
};

// on ICE candidate
rtc.onicecandidate = function (evt) {
if (evt.candidate) handler._sendSignal({candidate:evt.candidate});
};

// debugging
rtc.onicechange = function (evt) {
if (handler.DEBUG) {
console.log(handler.LOG_SELF, "ICE change", rtc.iceGatheringState, rtc.iceConnectionState);
}
};

rtc.onstatechange = function (evt) {
if (handler.DEBUG) {
console.log(handler.LOG_SELF, "State change", rtc.signalingState, rtc.readyState);
}
};
}

PeerConnectionHandler.prototype._sendSignal = function (data) {
if (!this.onhavesignal) throw Error("Need to send message but `onhavesignal` handler is not set.");
this.onhavesignal({target:this, signal:JSON.parse(JSON.stringify(data))});
};

PeerConnectionHandler.prototype.receiveSignal = function (data) {
var handler = this, rtc = this._rtc;
if (handler.DEBUG) console.log(this.LOG_SELF, "got data", data, "from", this.LOG_PEER);

if (data.sdp) rtc.setRemoteDescription(new RTCSessionDescription(data), function () {
var needsAnswer = (rtc.remoteDescription.type == 'offer');
if (handler.DEBUG) console.log(handler.LOG_SELF, "set offer, now creating answer:", needsAnswer);
if (needsAnswer) rtc.createAnswer(function (answerDesc) {
if (handler.DEBUG) console.log(handler.LOG_SELF, "got anwer, sending back to", handler.LOG_PEER);
rtc.setLocalDescription(answerDesc);
handler._sendSignal(answerDesc);
}, function (e) { console.warn(handler.LOG_SELF, "couldn't create answer", e); });
}, function (e) { console.warn(handler.LOG_SELF, "couldn't set remote description", e) });
else if (data.candidate) try { rtc.addIceCandidate(new RTCIceCandidate(data.candidate)); } catch (e) { console.error("Couldn't add candidate", e); }
};

PeerConnectionHandler.prototype.sendMessage = function (data) {
if (!this._channel || this._channel.readyState !== 'open') throw Error("Connection exists, but data channel is not open.");
this._channel.send(data);
};

PeerConnectionHandler.prototype._setupChannel = function (evt) {
var handler = this, rtc = this._rtc;
if (evt) if (handler.DEBUG) console.log(this.LOG_SELF, "received data channel", evt.channel.readyState);
this._channel = (evt) ? evt.channel : rtc.createDataChannel('peerpouch-dev');
// NOTE: in Chrome (M32) `this._channel.binaryType === 'arraybuffer'` instead of blob
this._channel.onopen = function (evt) {
if (handler.DEBUG) console.log(handler.LOG_SELF, "DATA CHANNEL IS OPEN", handler._channel);
if (handler.onconnection) handler.onconnection(handler._channel); // BOOM!
};
this._channel.onmessage = function (evt) {
if (handler.DEBUG) console.log(handler.LOG_SELF, "received message!", evt);
if (handler.onreceivemessage) handler.onreceivemessage({target:handler, data:evt.data});
};
if (window.mozRTCPeerConnection) setTimeout(function () {
rtc.onnegotiationneeded(); // FF doesn't trigger this for us like Chrome does
}, 0);
window.dbgChannel = this._channel;
};

PeerConnectionHandler.prototype._tube = function () { // TODO: refactor PeerConnectionHandler to simply be the "tube" itself
var tube = {},
handler = this;
tube.onmessage = null;
tube.send = function (data) {
handler.sendMessage(data);
};
handler.onreceivemessage = function (evt) {
if (tube.onmessage) tube.onmessage(evt);
};
return tube;
};

if(module && module.exports) {
module.exports = PeerConnectionHandler;
}
121 changes: 121 additions & 0 deletions src/peerpouch/PeerPouch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
var Pouch = require('pouchdb');
var RPCHandler = require("./RPCHandler");

Pouch.Errors.FORBIDDEN = {status:403, error:'forbidden', reason:"The request was refused"};
var PeerPouch = function(opts, handler, callback) {

var api = {}; // initialized later, but Pouch makes us return this before it's ready

handler.onconnection = function () {

var rpc = new RPCHandler(handler._tube());
rpc.onbootstrap = function (d) { // share will bootstrap
var rpcAPI = d.api;

// simply hook up each [proxied] remote method as our own local implementation
Object.keys(rpcAPI).forEach(function (k) { api[k]= rpcAPI[k]; });

// one override to provide a synchronous `.cancel()` helper locally
api._changes = function (opts) {
if (opts.onChange) opts.onChange._keep_exposed = true; // otherwise the RPC mechanism tosses after one use
var cancelRemotely = null,
cancelledLocally = false;
rpcAPI._changes(opts, function (rpcCancel) {
if (cancelledLocally) rpcCancel();
else cancelRemotely = rpcCancel;
});
return {cancel:function () {
if (cancelRemotely) cancelRemotely();
else cancelledLocally = true;
if (opts.onChange) delete opts.onChange._keep_exposed; // allow for slight chance of cleanup [if called again]
}};
};

api._id = function () {
// TODO: does this need to be "mangled" to distinguish it from the real copy?
// [it seems unnecessary: a replication with "api" is a replication with "rpcAPI"]
return rpcAPI._id;
};

// now our api object is *actually* ready for use
if (callback) callback(null, api);
};
};

return api;
};

PeerPouch.bootstrap = function(handler,db) {
var rpc = new RPCHandler(handler._tube());
rpc.bootstrap({
api: PeerPouch._wrappedAPI(db)
});
}

PeerPouch._wrappedAPI = function (db) {
/*
This object will be sent over to the remote peer. So, all methods on it must be:
- async-only (all "communication" must be via callback, not exceptions or return values)
- secure (peer could provide untoward arguments)
*/
var rpcAPI = {};


/*
This lists the core "customApi" methods that are expected by pouch.adapter.js
*/
var methods = ['bulkDocs', '_getRevisionTree', '_doCompaction', '_get', '_getAttachment', '_allDocs', '_changes', '_close', '_info', '_id'];

// most methods can just be proxied directly
methods.forEach(function (k) {
rpcAPI[k] = db[k];
if (rpcAPI[k]) rpcAPI[k]._keep_exposed = true;
});

// one override, to pass the `.cancel()` helper via callback to the synchronous override on the other side
rpcAPI._changes = function (opts, rpcCB) {
var retval = db._changes(opts);
rpcCB(retval.cancel);
}
rpcAPI._changes._keep_exposed = true;

// just send the local result
rpcAPI._id = db.id();

return rpcAPI;
};

// Don't bother letting peers nuke each others' databases
PeerPouch.destroy = function(name, callback) {
if (callback) setTimeout(function () { callback(Pouch.Errors.FORBIDDEN); }, 0);
};

// Can we breathe in this environment?
PeerPouch.valid = function() {
// TODO: check for WebRTC+DataConnection support
return true;
};


PeerPouch._types = {
presence: 'com.stemstorage.peerpouch.presence',
signal: 'com.stemstorage.peerpouch.signal',
share: 'com.stemstorage.peerpouch.share'
}
var _t = PeerPouch._types; // local alias for brevitation…

// Register for our scheme
Pouch.adapter('webrtc', PeerPouch);

// Debug
Pouch.dbgPeerPouch = PeerPouch;

// Implements the API for dealing with a PouchDB peer's database over WebRTC
PeerPouch._shareInitializersByName = Object.create(null);
PeerPouch._init = function(opts, callback) {
var _init = PeerPouch._shareInitializersByName[opts.name];
if (!_init) throw Error("Unknown PeerPouch share dbname"); // TODO: use callback instead?

var handler = _init(opts);
return PeerPouch(opts,handler,callback);
}
111 changes: 111 additions & 0 deletions src/peerpouch/RPCHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
function RPCHandler(tube) {
this.onbootstrap = null; // caller MAY provide this

this._exposed_fns = Object.create(null);
this.serialize = function (obj) {
var messages = [];
messages.push(JSON.stringify(obj, function (k,v) {
if (typeof v === 'function') {
var id = Math.random().toFixed(20).slice(2);
this._exposed_fns[id] = v;
return {__remote_fn:id};
} else if (Object.prototype.toString.call(v) === '[object IDBTransaction]') {
// HACK: pouch.idb.js likes to bounce a ctx object around but if we null it out it recreates
// c.f. https://github.com/daleharvey/pouchdb/commit/e7f66a02509bd2a9bd12369c87e6238fadc13232
return;

// TODO: the WebSQL adapter also does this but does NOT create a new transaction if it's missing :-(
// https://github.com/daleharvey/pouchdb/blob/80514c7d655453213f9ca7113f327424969536c4/src/adapters/pouch.websql.js#L646
// so we'll have to either get that fixed upstream or add remote object references (but how to garbage collect? what if local uses?!)
} else if (_isBlob(v)) {
var n = messages.indexOf(v) + 1;
if (!n) n = messages.push(v);
return {__blob:n};
} else return v;
}.bind(this)));
return messages;
};

var blobsForNextCall = []; // each binary object is sent before the function call message
this.deserialize = function (data) {
if (typeof data === 'string') {
return JSON.parse(data, function (k,v) {
if (v && v.__remote_fn) return function () {
this._callRemote(v.__remote_fn, arguments);
}.bind(this);
else if (v && v.__blob) {
var b = blobsForNextCall[v.__blob-1];
if (!_isBlob(b)) b = new Blob([b]); // `b` may actually be an ArrayBuffer
return b;
}
else return v;
}.bind(this));
blobsForNextCall.length = 0;
} else blobsForNextCall.push(data);
};

function _isBlob(obj) {
var type = Object.prototype.toString.call(obj);
return (type === '[object Blob]' || type === '[object File]');
}

this._callRemote = function (fn, args) {
// console.log("Serializing RPC", fn, args);
var messages = this.serialize({
fn: fn,
args: Array.prototype.slice.call(args)
});
if (window.mozRTCPeerConnection) messages.forEach(function (msg) { tube.send(msg); });
else processNext();
// WORKAROUND: Chrome (as of M32) cannot send a Blob, only an ArrayBuffer. So we send each once converted…
function processNext() {
var msg = messages.shift();
if (!msg) return;
if (_isBlob(msg)) {
var r = new FileReader();
r.readAsArrayBuffer(msg);
r.onload = function () {
tube.send(r.result);
processNext();
}
} else {
tube.send(msg);
processNext();
}
}
};

this._exposed_fns['__BOOTSTRAP__'] = function () {
if (this.onbootstrap) this.onbootstrap.apply(this, arguments);
}.bind(this);


tube.onmessage = function (evt) {
var call = this.deserialize(evt.data);
if (!call) return; //

var fn = this._exposed_fns[call.fn];
if (!fn) {
console.warn("RPC call to unknown local function", call);
return;
}

// leak only callbacks which are marked for keeping (most are one-shot)
if (!fn._keep_exposed) delete this._exposed_fns[call.fn];

try {
// console.log("Calling RPC", fn, call.args);
fn.apply(null, call.args);
} catch (e) { // we do not signal exceptions remotely
console.warn("Local RPC invocation unexpectedly threw: "+e, e);
}
}.bind(this);
}

RPCHandler.prototype.bootstrap = function () {
this._callRemote('__BOOTSTRAP__', arguments);
};

if( module && module.exports ) {
module.exports = RPCHandler;
}
Loading

0 comments on commit 14c349e

Please sign in to comment.