diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index 66a193161447..668dd2bb03f2 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -127,7 +127,7 @@ function Subscription(pubsub, options) { this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false; this.closed = false; this.interval = util.is(options.interval, 'number') ? options.interval : 10; - this.inProgress = 0; + this.inProgressAckIds = {}; this.maxInProgress = util.is(options.maxInProgress, 'number') ? options.maxInProgress : Infinity; this.messageListeners = 0; @@ -243,7 +243,7 @@ Subscription.prototype.startPulling_ = function() { var maxResults; if (this.maxInProgress < Infinity) { - maxResults = this.maxInProgress - this.inProgress; + maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length; } this.pull({ @@ -284,8 +284,10 @@ Subscription.prototype.ack = function(ackIds, callback) { 'At least one ID must be specified before it can be acknowledged.'); } + ackIds = util.arrayize(ackIds); + var body = { - ackIds: util.arrayize(ackIds) + ackIds: ackIds }; callback = callback || util.noop; @@ -293,7 +295,6 @@ Subscription.prototype.ack = function(ackIds, callback) { var path = this.name + ':acknowledge'; this.makeReq_('POST', path, null, body, function() { - self.inProgress--; self.refreshPausedStatus_(); callback.apply(self, arguments); }); @@ -391,9 +392,10 @@ Subscription.prototype.pull = function(options, callback) { } var messages = response.receivedMessages || []; - messages = messages.map(Subscription.formatMessage_); + messages = messages + .map(Subscription.formatMessage_) + .map(self.decorateMessage_.bind(self)); - self.inProgress += messages.length; self.refreshPausedStatus_(); if (self.autoAck && messages.length !== 0) { @@ -441,9 +443,25 @@ Subscription.prototype.setAckDeadline = function(options, callback) { this.makeReq_('POST', path, null, body, callback); }; +Subscription.prototype.decorateMessage_ = function(message) { + var self = this; + + this.inProgressAckIds[message.ackId] = true; + + message.ack = self.ack.bind(self, message.ackId); + + message.skip = function() { + delete self.inProgressAckIds[message.ackId]; + self.refreshPausedStatus_(); + }; + + return message; +}; + Subscription.prototype.refreshPausedStatus_ = function() { var isCurrentlyPaused = this.paused; - this.paused = this.inProgress >= this.maxInProgress; + var inProgress = Object.keys(this.inProgressAckIds).length; + this.paused = inProgress >= this.maxInProgress; if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) { this.startPulling_();