Skip to content

Commit

Permalink
Change send API to allow more correct transaction resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
mqp committed Jan 10, 2018
1 parent e7d366e commit 8bd73aa
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 53 deletions.
45 changes: 23 additions & 22 deletions bundle.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,40 @@ function JanusPluginHandle(session) {

/** Attaches this handle to the Janus server and sets its ID. **/
JanusPluginHandle.prototype.attach = function(plugin) {
var payload = { janus: "attach", plugin: plugin, "force-bundle": true, "force-rtcp-mux": true };
return this.session.send(payload).then(resp => {
var payload = { plugin: plugin, "force-bundle": true, "force-rtcp-mux": true };
return this.session.send("attach", payload).then(resp => {
this.id = resp.data.id;
return resp;
});
};

/** Detaches this handle. **/
JanusPluginHandle.prototype.detach = function() {
return this.send({ janus: "detach" });
return this.send("detach");
};

/**
* Sends a signal associated with this handle. Signals should be JSON-serializable objects. Returns a promise that will
* be resolved or rejected when a response to this signal is received, or when no response is received within the
* session timeout.
**/
JanusPluginHandle.prototype.send = function(signal) {
return this.session.send(Object.assign({ handle_id: this.id }, signal));
JanusPluginHandle.prototype.send = function(type, signal) {
return this.session.send(type, Object.assign({ handle_id: this.id }, signal));
};

/** Sends a plugin-specific message associated with this handle. **/
JanusPluginHandle.prototype.sendMessage = function(body) {
return this.send({ janus: "message", body: body });
return this.send("message", { body: body });
};

/** Sends a JSEP offer or answer associated with this handle. **/
JanusPluginHandle.prototype.sendJsep = function(jsep) {
return this.send({ janus: "message", body: {}, jsep: jsep });
return this.send("message", { body: {}, jsep: jsep });
};

/** Sends an ICE trickle candidate associated with this handle. **/
JanusPluginHandle.prototype.sendTrickle = function(candidate) {
return this.send({ janus: "trickle", candidate: candidate });
return this.send("trickle", { candidate: candidate });
};

/**
Expand All @@ -71,15 +71,15 @@ function JanusSession(output, options) {

/** Creates this session on the Janus server and sets its ID. **/
JanusSession.prototype.create = function() {
return this.send({ janus: "create" }).then(resp => {
return this.send("create").then(resp => {
this.id = resp.data.id;
return resp;
});
};

/** Destroys this session. **/
JanusSession.prototype.destroy = function() {
return this.send({ janus: "destroy" }).then(() => {
return this.send("destroy").then(() => {
this._killKeepalive();
});
};
Expand All @@ -105,16 +105,16 @@ JanusSession.prototype.receive = function(signal) {
console.debug("Incoming Janus signal: ", signal);
}
if (signal.transaction != null) {
var handlers = this.txns[signal.transaction];
if (signal.janus === "ack") {
// this is an ack of an asynchronously-processed request, we should wait
var txn = this.txns[signal.transaction];
if (signal.janus === "ack" && txn.type == "message") {
// this is an ack of an asynchronously-processed plugin request, we should wait
// to resolve the promise until the actual response comes in
} else if (handlers != null) {
if (handlers.timeout != null) {
clearTimeout(handlers.timeout);
} else if (txn != null) {
if (txn.timeout != null) {
clearTimeout(txn.timeout);
}
delete this.txns[signal.transaction];
(this.isError(signal) ? handlers.reject : handlers.resolve)(signal);
(this.isError(signal) ? txn.reject : txn.resolve)(signal);
}
}
};
Expand All @@ -124,11 +124,12 @@ JanusSession.prototype.receive = function(signal) {
* be resolved or rejected when a response to this signal is received, or when no response is received within the
* session timeout.
**/
JanusSession.prototype.send = function(signal) {
JanusSession.prototype.send = function(type, signal) {
var txid = (this.nextTxId++).toString();
signal = Object.assign({ janus: type, transaction: txid }, signal);
if (this.id != null) { // this.id is undefined in the special case when we're sending the session create message
signal = Object.assign({ session_id: this.id }, signal);
}
signal = Object.assign({ transaction: (this.nextTxId++).toString() }, signal);
if (module.exports.verbose) {
console.debug("Outgoing Janus signal: ", signal);
}
Expand All @@ -137,17 +138,17 @@ JanusSession.prototype.send = function(signal) {
if (this.options.timeoutMs) {
timeout = setTimeout(() => {
delete this.txns[signal.transaction];
reject(new Error("Signalling message timed out."));
reject(new Error("Signalling message with txid " + txid + " timed out."));
}, this.options.timeoutMs);
}
this.txns[signal.transaction] = { resolve: resolve, reject: reject, timeout: timeout };
this.txns[signal.transaction] = { resolve: resolve, reject: reject, timeout: timeout, type: type };
this.output(JSON.stringify(signal));
this._resetKeepalive();
});
};

JanusSession.prototype._keepalive = function() {
return this.send({ janus: "keepalive" });
return this.send("keepalive");
};

JanusSession.prototype._killKeepalive = function() {
Expand Down
45 changes: 23 additions & 22 deletions minijanus.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,40 @@ function JanusPluginHandle(session) {

/** Attaches this handle to the Janus server and sets its ID. **/
JanusPluginHandle.prototype.attach = function(plugin) {
var payload = { janus: "attach", plugin: plugin, "force-bundle": true, "force-rtcp-mux": true };
return this.session.send(payload).then(resp => {
var payload = { plugin: plugin, "force-bundle": true, "force-rtcp-mux": true };
return this.session.send("attach", payload).then(resp => {
this.id = resp.data.id;
return resp;
});
};

/** Detaches this handle. **/
JanusPluginHandle.prototype.detach = function() {
return this.send({ janus: "detach" });
return this.send("detach");
};

/**
* Sends a signal associated with this handle. Signals should be JSON-serializable objects. Returns a promise that will
* be resolved or rejected when a response to this signal is received, or when no response is received within the
* session timeout.
**/
JanusPluginHandle.prototype.send = function(signal) {
return this.session.send(Object.assign({ handle_id: this.id }, signal));
JanusPluginHandle.prototype.send = function(type, signal) {
return this.session.send(type, Object.assign({ handle_id: this.id }, signal));
};

/** Sends a plugin-specific message associated with this handle. **/
JanusPluginHandle.prototype.sendMessage = function(body) {
return this.send({ janus: "message", body: body });
return this.send("message", { body: body });
};

/** Sends a JSEP offer or answer associated with this handle. **/
JanusPluginHandle.prototype.sendJsep = function(jsep) {
return this.send({ janus: "message", body: {}, jsep: jsep });
return this.send("message", { body: {}, jsep: jsep });
};

/** Sends an ICE trickle candidate associated with this handle. **/
JanusPluginHandle.prototype.sendTrickle = function(candidate) {
return this.send({ janus: "trickle", candidate: candidate });
return this.send("trickle", { candidate: candidate });
};

/**
Expand All @@ -70,15 +70,15 @@ function JanusSession(output, options) {

/** Creates this session on the Janus server and sets its ID. **/
JanusSession.prototype.create = function() {
return this.send({ janus: "create" }).then(resp => {
return this.send("create").then(resp => {
this.id = resp.data.id;
return resp;
});
};

/** Destroys this session. **/
JanusSession.prototype.destroy = function() {
return this.send({ janus: "destroy" }).then(() => {
return this.send("destroy").then(() => {
this._killKeepalive();
});
};
Expand All @@ -104,16 +104,16 @@ JanusSession.prototype.receive = function(signal) {
console.debug("Incoming Janus signal: ", signal);
}
if (signal.transaction != null) {
var handlers = this.txns[signal.transaction];
if (signal.janus === "ack") {
// this is an ack of an asynchronously-processed request, we should wait
var txn = this.txns[signal.transaction];
if (signal.janus === "ack" && txn.type == "message") {
// this is an ack of an asynchronously-processed plugin request, we should wait
// to resolve the promise until the actual response comes in
} else if (handlers != null) {
if (handlers.timeout != null) {
clearTimeout(handlers.timeout);
} else if (txn != null) {
if (txn.timeout != null) {
clearTimeout(txn.timeout);
}
delete this.txns[signal.transaction];
(this.isError(signal) ? handlers.reject : handlers.resolve)(signal);
(this.isError(signal) ? txn.reject : txn.resolve)(signal);
}
}
};
Expand All @@ -123,11 +123,12 @@ JanusSession.prototype.receive = function(signal) {
* be resolved or rejected when a response to this signal is received, or when no response is received within the
* session timeout.
**/
JanusSession.prototype.send = function(signal) {
JanusSession.prototype.send = function(type, signal) {
var txid = (this.nextTxId++).toString();
signal = Object.assign({ janus: type, transaction: txid }, signal);
if (this.id != null) { // this.id is undefined in the special case when we're sending the session create message
signal = Object.assign({ session_id: this.id }, signal);
}
signal = Object.assign({ transaction: (this.nextTxId++).toString() }, signal);
if (module.exports.verbose) {
console.debug("Outgoing Janus signal: ", signal);
}
Expand All @@ -136,17 +137,17 @@ JanusSession.prototype.send = function(signal) {
if (this.options.timeoutMs) {
timeout = setTimeout(() => {
delete this.txns[signal.transaction];
reject(new Error("Signalling message timed out."));
reject(new Error("Signalling message with txid " + txid + " timed out."));
}, this.options.timeoutMs);
}
this.txns[signal.transaction] = { resolve: resolve, reject: reject, timeout: timeout };
this.txns[signal.transaction] = { resolve: resolve, reject: reject, timeout: timeout, type: type };
this.output(JSON.stringify(signal));
this._resetKeepalive();
});
};

JanusSession.prototype._keepalive = function() {
return this.send({ janus: "keepalive" });
return this.send("keepalive");
};

JanusSession.prototype._killKeepalive = function() {
Expand Down
21 changes: 12 additions & 9 deletions tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ var test = require('tape');
test('transactions are detected and matched up', function(t) {
var session = new mj.JanusSession(signal => {}, { keepaliveMs: null });

var aq = session.send({ transaction: "figs" });
var bq = session.send({ transaction: "wigs" });
var cq = session.send({ transaction: "pigs" });
var trickle = session.send("trickle", { transaction: "bigs" });
var aq = session.send("message", { transaction: "figs" });
var bq = session.send("message", { transaction: "wigs" });
var cq = session.send("message", { transaction: "pigs" });

session.receive({ transaction: "bigs", janus: "ack" });
session.receive({ transaction: "figs", janus: "ack" });
session.receive({ transaction: "wigs", janus: "ack" });
session.receive({ transaction: "pigs", janus: "ack", hint: "Asynchronously processing some pigs." });
Expand All @@ -18,10 +20,11 @@ test('transactions are detected and matched up', function(t) {
session.receive({ transaction: "figs", cats: "hats" });
session.receive({ transaction: "wigs" });

Promise.all([aq, bq, cq]).then(results => {
t.deepEqual(results[0], { transaction: "figs", cats: "hats" });
t.deepEqual(results[1], { transaction: "wigs" });
t.deepEqual(results[2], { transaction: "pigs", rats: "pats" });
Promise.all([trickle, aq, bq, cq]).then(results => {
t.deepEqual(results[0], { transaction: "bigs", janus: "ack" });
t.deepEqual(results[1], { transaction: "figs", cats: "hats" });
t.deepEqual(results[2], { transaction: "wigs" });
t.deepEqual(results[3], { transaction: "pigs", rats: "pats" });
t.deepEqual(session.txns, {});
t.end();
});
Expand All @@ -30,11 +33,11 @@ test('transactions are detected and matched up', function(t) {
test('transaction timeouts happen', function(t) {
var session = new mj.JanusSession(signal => {}, { timeoutMs: 5, keepaliveMs: null });

var aq = session.send({ transaction: "lazy" }).then(
var aq = session.send("message", { transaction: "lazy" }).then(
resp => { t.fail("Request should have failed!"); return resp; },
err => { t.pass("Timeout should have fired!"); return err; }
);
var bq = session.send({ transaction: "hasty" }).then(
var bq = session.send("message", { transaction: "hasty" }).then(
resp => { t.pass("Request should have succeeded!"); return resp; },
err => { t.fail("Timeout shouldn't have fired!"); return err; }
);
Expand Down

0 comments on commit 8bd73aa

Please sign in to comment.