Skip to content

Commit

Permalink
Merge pull request #148 from megawac/streaming
Browse files Browse the repository at this point in the history
Make registering as a subscriber or publisher an option for the streaming API
  • Loading branch information
rctoris committed Dec 15, 2014
2 parents 984e4bd + 14a5309 commit 60938fb
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions src/node/TopicStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@ var DuplexStream = require('stream').Duplex;
* Publish a connected ROS topic to a duplex
* stream. This stream can be piped to, which will
* publish to the topic
*
* @options
* * subscribe: whether to subscribe to the topic and start emitting
* Data
* * publish: whether to register the stream as a publisher to the topic
* * transform: a function to change the data to be published
* or filter it if false is returned
*/
Topic.prototype.toStream = function(transform) {
Topic.prototype.toStream = function(options) {
options = options || {subscribe: true, publish: true};

var topic = this;
var hasTransform = typeof transform === 'function';
var hasTransform = typeof options.transform === 'function';

var stream = new DuplexStream({
objectMode: true
Expand All @@ -18,18 +27,24 @@ Topic.prototype.toStream = function(transform) {
// Publish to the topic if someone pipes to stream
stream._write = function(chunk, encoding, callback) {
if (hasTransform) {
chunk = transform(chunk);
chunk = options.transform(chunk);
}
if (chunk) {
if (chunk === false) {
topic.publish(chunk);
}
callback();
};

this.subscribe(function(message) {
stream.push(message);
});
this.on('unsubscribe', stream.push.bind(stream, null));
if (options.subscribe) {
this.subscribe(function(message) {
stream.push(message);
});
this.on('unsubscribe', stream.push.bind(stream, null));
}

if (options.publish) {
this.advertise();
}

return stream;
};
Expand Down

0 comments on commit 60938fb

Please sign in to comment.