Skip to content

Commit

Permalink
Proper end of stream signaling to rpcImpl, see #529
Browse files Browse the repository at this point in the history
  • Loading branch information
dcodeIO committed Dec 8, 2016
1 parent e4faf7f commit 9ea3766
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 23 deletions.
23 changes: 16 additions & 7 deletions dist/protobuf.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dist/protobuf.js.map

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions dist/protobuf.min.js

Large diffs are not rendered by default.

Binary file modified dist/protobuf.min.js.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion dist/protobuf.min.js.map

Large diffs are not rendered by default.

22 changes: 17 additions & 5 deletions scripts/rpc-test.js → examples/streaming-rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ var Greeter = root.lookup("Greeter"),
Hello = root.lookup("Hello"),
World = root.lookup("World");

var ended = false;

// Implement: Stream-aware RPC implementation
function rpcImpl(method, requestData, callback) {
if (ended)
return;
if (!requestData) {
console.log("rpc ended client-side.");
ended = true;
return;
}
setTimeout(function() {
try {
// <exemplary server side code>
Expand Down Expand Up @@ -63,12 +73,14 @@ greeter.on("error", function(err, method) {
console.log("error:", err);
});

greeter.sayHello({ name: 'node' });
greeter.sayHello({ name: 'protobuf' });
greeter.sayHello({ name: 'paralin' });
greeter.sayHello({ name: 'protocol' });
greeter.sayHello({ name: 'buffers' });
greeter.sayHello({ name: 'for' });

setTimeout(function() {
greeter.end();
greeter.sayHello({ name: 'dcode' }); // does nothing
// ^ Signals rpcImpl that the service has been ended client-side by calling it with a null buffer.
// Likewise, rpcImpl can end the stream by calling its callback with an explicit null buffer.

greeter.sayHello({ name: 'javascript' }); // does nothing
}, 1000);
// Likewise, the RPC impl can end the stream by calling its callback with an explicit null message
15 changes: 11 additions & 4 deletions src/rpc/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ function Service(rpcImpl) {
EventEmitter.call(this);

/**
* RPC implementation.
* @type {RPCImpl}
* RPC implementation. Becomes `null` when the service is ended.
* @type {?RPCImpl}
*/
this.$rpc = rpcImpl;
}
Expand All @@ -27,8 +27,15 @@ ServicePrototype.constructor = Service;

/**
* Ends this service and emits the `end` event.
* @param {boolean} [endedByRPC=false] Whether the service has been ended by the RPC implementation.
* @returns {rpc.Service} `this`
*/
ServicePrototype.end = function end() {
return this.emit('end').off();
ServicePrototype.end = function end(endedByRPC) {
if (this.$rpc) {
if (!endedByRPC) // signal end to rpcImpl
this.$rpc(null, null, null);
this.$rpc = null;
this.emit('end').off();
}
return this;
};
4 changes: 3 additions & 1 deletion src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ ServicePrototype.create = function create(rpcImpl, requestDelimited, responseDel
this.getMethodsArray().forEach(function(method) {
var lcName = method.name.substring(0, 1).toLowerCase() + method.name.substring(1);
rpcService[lcName] = function(request, callback) {
if (!rpcService.$rpc) // already ended?
return;
method.resolve();
var requestData;
try {
Expand All @@ -178,7 +180,7 @@ ServicePrototype.create = function create(rpcImpl, requestDelimited, responseDel
return callback ? callback(err) : undefined;
}
if (responseData === null) {
rpcService.emit('end', method);
rpcService.end(/* endedByRPC */ true);
return undefined;
}
var response;
Expand Down
2 changes: 1 addition & 1 deletion src/util/eventemitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ EventEmitterPrototype.off = function off(evt, fn) {
};

/**
* Emits an event.
* Emits an event by calling its listeners with the specified arguments.
* @param {string} evt Event name
* @param {...*} args Arguments
* @returns {util.EventEmitter} `this`
Expand Down

0 comments on commit 9ea3766

Please sign in to comment.