Skip to content

Commit

Permalink
Merge pull request #650 from stephenplusplus/spp--pubsub-add-maxInPro…
Browse files Browse the repository at this point in the history
…gress

add maxInProgress option
  • Loading branch information
callmehiphop committed Jul 17, 2015
2 parents f72a307 + e914998 commit c01e9fd
Show file tree
Hide file tree
Showing 2 changed files with 350 additions and 77 deletions.
116 changes: 109 additions & 7 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var util = require('../common/util.js');
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string} options.name - Name of the subscription.
* @param {number} options.maxInProgress - Maximum messages to consume
* simultaneously.
*/
/**
* A Subscription object will give you access to your Google Cloud Pub/Sub
Expand Down Expand Up @@ -106,9 +108,18 @@ var util = require('../common/util.js');
* // Register a listener for `message` events.
* function onMessage(message) {
* // Called every time a message is received.
*
* // message.id = ID of the message.
* // message.ackId = ID used to acknowledge the message receival.
* // message.data = Contents of the message.
*
* // Ack the message:
* // message.ack(callback);
*
* // Skip the message. This is useful with `maxInProgress` option when
* // creating your subscription. This doesn't ack the message, but allows
* // more messages to be retrieved if your limit was hit.
* // message.skip();
* }
* subscription.on('message', onMessage);
*
Expand All @@ -125,6 +136,11 @@ function Subscription(pubsub, options) {
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
this.closed = false;
this.interval = util.is(options.interval, 'number') ? options.interval : 10;
this.inProgressAckIds = {};
this.maxInProgress =
util.is(options.maxInProgress, 'number') ? options.maxInProgress : Infinity;
this.messageListeners = 0;
this.paused = false;

this.listenForEvents_();
}
Expand Down Expand Up @@ -191,11 +207,10 @@ Subscription.formatMessage_ = function(msg) {
*/
Subscription.prototype.listenForEvents_ = function() {
var self = this;
var messageListeners = 0;

this.on('newListener', function(event) {
if (event === 'message') {
messageListeners++;
this.messageListeners++;
if (self.closed) {
self.closed = false;
}
Expand All @@ -204,7 +219,7 @@ Subscription.prototype.listenForEvents_ = function() {
});

this.on('removeListener', function(event) {
if (event === 'message' && --messageListeners === 0) {
if (event === 'message' && --this.messageListeners === 0) {
self.closed = true;
}
});
Expand All @@ -229,20 +244,31 @@ Subscription.prototype.listenForEvents_ = function() {
*/
Subscription.prototype.startPulling_ = function() {
var self = this;
if (this.closed) {

if (this.closed || this.paused) {
return;
}

var maxResults;

if (this.maxInProgress < Infinity) {
maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length;
}

this.pull({
returnImmediately: false
returnImmediately: false,
maxResults: maxResults
}, function(err, messages, apiResponse) {
if (err) {
self.emit('error', err, apiResponse);
}

if (messages) {
messages.forEach(function(message) {
self.emit('message', message, apiResponse);
});
}

setTimeout(self.startPulling_.bind(self), self.interval);
});
};
Expand All @@ -260,16 +286,33 @@ Subscription.prototype.startPulling_ = function() {
* subscription.ack('ePHEESyhuE8e...', function(err, apiResponse) {});
*/
Subscription.prototype.ack = function(ackIds, callback) {
var self = this;

if (!ackIds || ackIds.length === 0) {
throw new Error(
'At least one ID must be specified before it can be acknowledged.');
}

ackIds = util.arrayize(ackIds);

var body = {
ackIds: ackIds
};

callback = callback || util.noop;

var path = this.name + ':acknowledge';
this.makeReq_('POST', path, null, body, callback || util.noop);

this.makeReq_('POST', path, null, body, function(err, resp) {
if (!err) {
ackIds.forEach(function(ackId) {
delete self.inProgressAckIds[ackId];
});
self.refreshPausedStatus_();
}

callback(err, resp);
});
};

/**
Expand Down Expand Up @@ -324,6 +367,10 @@ Subscription.prototype.delete = function(callback) {
* // id: '', // Unique message ID.
* // data: '', // Contents of the message.
* // attributes: {} // Attributes of the message.
* //
* // Helper functions:
* // ack(callback): // Ack the message.
* // skip(): // Free up 1 slot on the sub's maxInProgress value.
* // },
* // // ...
* // ]
Expand Down Expand Up @@ -364,7 +411,11 @@ Subscription.prototype.pull = function(options, callback) {
}

var messages = response.receivedMessages || [];
messages = messages.map(Subscription.formatMessage_);
messages = messages
.map(Subscription.formatMessage_)
.map(self.decorateMessage_.bind(self));

self.refreshPausedStatus_();

if (self.autoAck && messages.length !== 0) {
var ackIds = messages.map(function(message) {
Expand Down Expand Up @@ -411,4 +462,55 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
this.makeReq_('POST', path, null, body, callback);
};

/**
* Add functionality on top of a message returned from the API, including the
* ability to `ack` and `skip` the message.
*
* This also records the message as being "in progress". See
* {module:subscription#refreshPausedStatus_}.
*
* @private
*
* @param {object} message - A message object.
* @return {object} message - The original message after being decorated.
* @param {function} message.ack - Ack the message.
* @param {function} message.skip - Increate the number of available messages to
* simultaneously receive.
*/
Subscription.prototype.decorateMessage_ = function(message) {
var self = this;

this.inProgressAckIds[message.ackId] = true;

message.ack = self.ack.bind(self, message.ackId);

message.skip = function() {
delete self.inProgressAckIds[message.ackId];
self.refreshPausedStatus_();
};

return message;
};

/**
* Update the status of `maxInProgress`. Å subscription becomes "paused" (not
* pulling) when the number of messages that have yet to be ack'd or skipped
* exceeds the user's specified `maxInProgress` value.
*
* This will start pulling when that event reverses: we were paused, but one or
* more messages were just ack'd or skipped, freeing up room for more messages
* to be consumed.
*
* @private
*/
Subscription.prototype.refreshPausedStatus_ = function() {
var isCurrentlyPaused = this.paused;
var inProgress = Object.keys(this.inProgressAckIds).length;
this.paused = inProgress >= this.maxInProgress;

if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) {
this.startPulling_();
}
};

module.exports = Subscription;
Loading

0 comments on commit c01e9fd

Please sign in to comment.