Skip to content

Commit

Permalink
add message decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 10, 2015
1 parent 832fc66 commit 9fb635d
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function Subscription(pubsub, options) {
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
this.closed = false;
this.interval = util.is(options.interval, 'number') ? options.interval : 10;
this.inProgress = 0;
this.inProgressAckIds = {};
this.maxInProgress =
util.is(options.maxInProgress, 'number') ? options.maxInProgress : Infinity;
this.messageListeners = 0;
Expand Down Expand Up @@ -243,7 +243,7 @@ Subscription.prototype.startPulling_ = function() {
var maxResults;

if (this.maxInProgress < Infinity) {
maxResults = this.maxInProgress - this.inProgress;
maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length;
}

this.pull({
Expand Down Expand Up @@ -284,16 +284,17 @@ Subscription.prototype.ack = function(ackIds, callback) {
'At least one ID must be specified before it can be acknowledged.');
}

ackIds = util.arrayize(ackIds);

var body = {
ackIds: util.arrayize(ackIds)
ackIds: ackIds
};

callback = callback || util.noop;

var path = this.name + ':acknowledge';

this.makeReq_('POST', path, null, body, function() {
self.inProgress--;
self.refreshPausedStatus_();
callback.apply(self, arguments);
});
Expand Down Expand Up @@ -391,9 +392,10 @@ Subscription.prototype.pull = function(options, callback) {
}

var messages = response.receivedMessages || [];
messages = messages.map(Subscription.formatMessage_);
messages = messages
.map(Subscription.formatMessage_)
.map(self.decorateMessage_.bind(self));

self.inProgress += messages.length;
self.refreshPausedStatus_();

if (self.autoAck && messages.length !== 0) {
Expand Down Expand Up @@ -441,9 +443,25 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
this.makeReq_('POST', path, null, body, callback);
};

Subscription.prototype.decorateMessage_ = function(message) {
var self = this;

this.inProgressAckIds[message.ackId] = true;

message.ack = self.ack.bind(self, message.ackId);

message.skip = function() {
delete self.inProgressAckIds[message.ackId];
self.refreshPausedStatus_();
};

return message;
};

Subscription.prototype.refreshPausedStatus_ = function() {
var isCurrentlyPaused = this.paused;
this.paused = this.inProgress >= this.maxInProgress;
var inProgress = Object.keys(this.inProgressAckIds).length;
this.paused = inProgress >= this.maxInProgress;

if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) {
this.startPulling_();
Expand Down

0 comments on commit 9fb635d

Please sign in to comment.