diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index 1d5360c1f77e..7b45b53c9f2f 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -149,16 +149,19 @@ Subscription.formatMessage_ = function(msg) { var event = msg.pubsubEvent; var message = { - ackId: msg.ackId, - id: event.message.messageId + ackId: msg.ackId }; - if (event.message.data) { - message.data = new Buffer(event.message.data, 'base64').toString('utf-8'); + if (event && event.message) { + message.id = event.message.messageId; - try { - message.data = JSON.parse(message.data); - } catch(e) {} + if (event.message.data) { + message.data = new Buffer(event.message.data, 'base64').toString('utf-8'); + + try { + message.data = JSON.parse(message.data); + } catch(e) {} + } } return message; @@ -221,6 +224,7 @@ Subscription.prototype.startPulling_ = function() { return; } this.pull({ + maxCount: 1, returnImmediately: false }, function(err, message) { if (err) { @@ -283,16 +287,16 @@ Subscription.prototype.delete = function(callback) { /** * Pull messages from the subscribed topic. If messages were found, your - * callback is executed with the message object. - * - * @todo Should not be racing with other pull. - * @todo Fix API to return a list of messages. + * callback is executed with an array of message objects. * * Note that messages are pulled automatically once you register your first * event listener to the subscription, thus the call to `pull` is handled for * you. If you don't want to start pulling, simply don't register a * `subscription.on('message', function() {})` event handler. * + * @todo Should not be racing with other pull. + * @todo Fix API to return a list of messages. + * * @param {object=} options - Configuration object. * @param {boolean} options.returnImmediately - If set, the system will respond * immediately. Otherwise, wait until new messages are available. Returns if @@ -301,15 +305,34 @@ Subscription.prototype.delete = function(callback) { * @param {function} callback - The callback function. * * @example - * subscription.pull(function(err, message) { - * // message.id = ID used to acknowledge its receival. - * // message.data = Contents of the message. + * //- + * // Pull all available messages. + * //- + * subscription.pull(function(err, messages) { + * // messages = [ + * // { + * // ackId: '', // ID used to acknowledge its receival. + * // id: '', // Unique message ID. + * // data: '' // Contents of the message. + * // }, + * // // ... + * // ] * }); * * //- - * // Limit the results. + * // Pull a single message. * //- - * subscription.pull({ maxCount: 3 }, function(err, messages) {}); + * var opts = { + * maxCount: 1 + * }; + * + * subscription.pull(opts, function(err, message) { + * // message = { + * // ackId: '', // ID used to acknowledge its receival. + * // id: '', // Unique message ID. + * // data: '' // Contents of the message. + * // } + * }); */ Subscription.prototype.pull = function(options, callback) { var that = this; @@ -319,15 +342,19 @@ Subscription.prototype.pull = function(options, callback) { options = {}; } - var batch = util.is(options.maxCount, 'number'); - var apiEndpoint = batch ? 'subscriptions/pullBatch' : 'subscriptions/pull'; + if (!util.is(options.maxCount, 'number')) { + options.maxCount = 99999; + } + + var single = options.maxCount === 1; + var apiEndpoint = single ? 'subscriptions/pull' : 'subscriptions/pullBatch'; var body = { subscription: this.name, returnImmediately: !!options.returnImmediately }; - if (batch) { + if (!single) { body.maxEvents = options.maxCount; } diff --git a/regression/pubsub.js b/regression/pubsub.js index 5705b49d4e2a..f6c17366ae65 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -178,7 +178,10 @@ describe('pubsub', function() { topic.publish('hello', assert.ifError); - subscription.pull({ returnImmediately: true }, function(err, msg) { + subscription.pull({ + returnImmediately: true, + maxCount: 1 + }, function(err, msg) { assert.ifError(err); subscription.ack(msg.ackId, done); }); @@ -188,6 +191,10 @@ describe('pubsub', function() { var subscription = topic.subscription(subscriptions[0].name); topic.publish([ + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, @@ -196,7 +203,10 @@ describe('pubsub', function() { { data: new Buffer('hello').toString('base64') } ], assert.ifError); - subscription.pull({ returnImmediately: true }, function(err, msg) { + subscription.pull({ + returnImmediately: true, + maxCount: 1 + }, function(err, msg) { assert.ifError(err); assert.equal(msg.data, 'hello'); subscription.ack(msg.ackId, done); @@ -207,6 +217,10 @@ describe('pubsub', function() { var subscription = topic.subscription(subscriptions[0].name); topic.publish([ + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, @@ -215,7 +229,10 @@ describe('pubsub', function() { { data: new Buffer('hello').toString('base64') } ], assert.ifError); - subscription.pull({ returnImmediately: true }, function(err, msg) { + subscription.pull({ + returnImmediately: true, + maxCount: 1 + }, function(err, msg) { assert.ifError(err); assert.equal(msg.data, 'hello'); subscription.ack(msg.ackId, done); @@ -227,6 +244,10 @@ describe('pubsub', function() { var opts = { returnImmediately: true, maxCount: 3 }; topic.publish([ + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, + { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, { data: new Buffer('hello').toString('base64') }, diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index e87f9b77e54f..5189b4cf8127 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -205,6 +205,16 @@ describe('Subscription', function() { subscription.pull({ returnImmediately: true }, assert.ifError); }); + it('should default to batching', function(done) { + subscription.makeReq_ = function(method, path, query, body) { + assert.equal(path, 'subscriptions/pullBatch'); + assert.equal(body.maxEvents, 99999); + done(); + }; + + subscription.pull(assert.ifError); + }); + describe('single pull', function() { it('should make correct api request', function(done) { subscription.makeReq_ = function(method, path, qs, body) { @@ -215,7 +225,7 @@ describe('Subscription', function() { done(); }; - subscription.pull({}, assert.ifError); + subscription.pull({ maxCount: 1 }, assert.ifError); }); it('should execute callback with a message', function(done) {