Skip to content

Commit

Permalink
default to single pull
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Feb 12, 2015
1 parent b1db00a commit 336d210
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 23 deletions.
65 changes: 46 additions & 19 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,19 @@ Subscription.formatMessage_ = function(msg) {
var event = msg.pubsubEvent;

var message = {
ackId: msg.ackId,
id: event.message.messageId
ackId: msg.ackId
};

if (event.message.data) {
message.data = new Buffer(event.message.data, 'base64').toString('utf-8');
if (event && event.message) {
message.id = event.message.messageId;

try {
message.data = JSON.parse(message.data);
} catch(e) {}
if (event.message.data) {
message.data = new Buffer(event.message.data, 'base64').toString('utf-8');

try {
message.data = JSON.parse(message.data);
} catch(e) {}
}
}

return message;
Expand Down Expand Up @@ -221,6 +224,7 @@ Subscription.prototype.startPulling_ = function() {
return;
}
this.pull({
maxCount: 1,
returnImmediately: false
}, function(err, message) {
if (err) {
Expand Down Expand Up @@ -283,16 +287,16 @@ Subscription.prototype.delete = function(callback) {

/**
* Pull messages from the subscribed topic. If messages were found, your
* callback is executed with the message object.
*
* @todo Should not be racing with other pull.
* @todo Fix API to return a list of messages.
* callback is executed with an array of message objects.
*
* Note that messages are pulled automatically once you register your first
* event listener to the subscription, thus the call to `pull` is handled for
* you. If you don't want to start pulling, simply don't register a
* `subscription.on('message', function() {})` event handler.
*
* @todo Should not be racing with other pull.
* @todo Fix API to return a list of messages.
*
* @param {object=} options - Configuration object.
* @param {boolean} options.returnImmediately - If set, the system will respond
* immediately. Otherwise, wait until new messages are available. Returns if
Expand All @@ -301,15 +305,34 @@ Subscription.prototype.delete = function(callback) {
* @param {function} callback - The callback function.
*
* @example
* subscription.pull(function(err, message) {
* // message.id = ID used to acknowledge its receival.
* // message.data = Contents of the message.
* //-
* // Pull all available messages.
* //-
* subscription.pull(function(err, messages) {
* // messages = [
* // {
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '' // Contents of the message.
* // },
* // // ...
* // ]
* });
*
* //-
* // Limit the results.
* // Pull a single message.
* //-
* subscription.pull({ maxCount: 3 }, function(err, messages) {});
* var opts = {
* maxCount: 1
* };
*
* subscription.pull(opts, function(err, message) {
* // message = {
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '' // Contents of the message.
* // }
* });
*/
Subscription.prototype.pull = function(options, callback) {
var that = this;
Expand All @@ -319,15 +342,19 @@ Subscription.prototype.pull = function(options, callback) {
options = {};
}

var batch = util.is(options.maxCount, 'number');
var apiEndpoint = batch ? 'subscriptions/pullBatch' : 'subscriptions/pull';
if (!util.is(options.maxCount, 'number')) {
options.maxCount = 99999;
}

var single = options.maxCount === 1;
var apiEndpoint = single ? 'subscriptions/pull' : 'subscriptions/pullBatch';

var body = {
subscription: this.name,
returnImmediately: !!options.returnImmediately
};

if (batch) {
if (!single) {
body.maxEvents = options.maxCount;
}

Expand Down
27 changes: 24 additions & 3 deletions regression/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ describe('pubsub', function() {

topic.publish('hello', assert.ifError);

subscription.pull({ returnImmediately: true }, function(err, msg) {
subscription.pull({
returnImmediately: true,
maxCount: 1
}, function(err, msg) {
assert.ifError(err);
subscription.ack(msg.ackId, done);
});
Expand All @@ -188,6 +191,10 @@ describe('pubsub', function() {
var subscription = topic.subscription(subscriptions[0].name);

topic.publish([
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
Expand All @@ -196,7 +203,10 @@ describe('pubsub', function() {
{ data: new Buffer('hello').toString('base64') }
], assert.ifError);

subscription.pull({ returnImmediately: true }, function(err, msg) {
subscription.pull({
returnImmediately: true,
maxCount: 1
}, function(err, msg) {
assert.ifError(err);
assert.equal(msg.data, 'hello');
subscription.ack(msg.ackId, done);
Expand All @@ -207,6 +217,10 @@ describe('pubsub', function() {
var subscription = topic.subscription(subscriptions[0].name);

topic.publish([
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
Expand All @@ -215,7 +229,10 @@ describe('pubsub', function() {
{ data: new Buffer('hello').toString('base64') }
], assert.ifError);

subscription.pull({ returnImmediately: true }, function(err, msg) {
subscription.pull({
returnImmediately: true,
maxCount: 1
}, function(err, msg) {
assert.ifError(err);
assert.equal(msg.data, 'hello');
subscription.ack(msg.ackId, done);
Expand All @@ -227,6 +244,10 @@ describe('pubsub', function() {
var opts = { returnImmediately: true, maxCount: 3 };

topic.publish([
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
Expand Down
12 changes: 11 additions & 1 deletion test/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ describe('Subscription', function() {
subscription.pull({ returnImmediately: true }, assert.ifError);
});

it('should default to batching', function(done) {
subscription.makeReq_ = function(method, path, query, body) {
assert.equal(path, 'subscriptions/pullBatch');
assert.equal(body.maxEvents, 99999);
done();
};

subscription.pull(assert.ifError);
});

describe('single pull', function() {
it('should make correct api request', function(done) {
subscription.makeReq_ = function(method, path, qs, body) {
Expand All @@ -215,7 +225,7 @@ describe('Subscription', function() {
done();
};

subscription.pull({}, assert.ifError);
subscription.pull({ maxCount: 1 }, assert.ifError);
});

it('should execute callback with a message', function(done) {
Expand Down

0 comments on commit 336d210

Please sign in to comment.