Skip to content

Commit

Permalink
always use publishBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Feb 19, 2015
1 parent 0f84cdd commit e00d32f
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 228 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ pubsub.createTopic('my-new-topic', function(err, topic) {});
var topic = pubsub.topic('my-existing-topic');

// Publish a message to the topic.
topic.publish('New message!', function(err) {});
topic.publish({
data: 'New message!'
}, function(err) {});

// Subscribe to the topic.
topic.subscribe('new-subscription', function(err, subscription) {
Expand Down
5 changes: 5 additions & 0 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,14 @@ module.exports.extendGlobalConfig = extendGlobalConfig;
* // [ 'Hi' ]
*/
function arrayize(input) {
if (!input) {
return [];
}

if (!Array.isArray(input)) {
return [input];
}

return input;
}

Expand Down
128 changes: 34 additions & 94 deletions lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,11 @@ function Topic(pubsub, options) {
* @return {object}
*/
Topic.formatMessage_ = function(message) {
if (!message.data) {
message = {
data: new Buffer(message).toString('base64')
};
if (!util.is(message.data, 'buffer')) {
message.data = new Buffer(JSON.stringify(message.data));
}

if (!util.is(message.data, 'string') && !util.is(message.data, 'buffer')) {
message.data = JSON.stringify(message.data);
}
message.data = message.data.toString('base64');

return message;
};
Expand All @@ -104,74 +100,55 @@ Topic.formatName_ = function(projectId, name) {
*
* @throws {Error} If no message is provided.
*
* @param {*} message - The message to publish.
* @param {object|object[]} message - The message(s) to publish.
* @param {*} message.data - The contents of the message in any format.
* @param {array=} message.labels - Labels to apply to the message.
* @param {function=} callback - The callback function.
*
* @example
* //-
* // You may publish a simple string message.
* //-
* var simpleMessage = 'new user';
* topic.publish(simpleMessage, function(err) {});
* var registrationMessage = {
* data: {
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* },
* labels: [
* 'registration'
* ]
* };
* topic.publish(registrationMessage, function(err) {});
*
* //-
* // Other times, you may need more information than a string can provide. Use
* // an object.
* // You can publish a batch of messages at once by supplying an array.
* //-
* var objectMessage = {
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* var purchaseMessage = {
* data: {
* userId: 3,
* product: 'computer',
* event: 'purchase'
* }
* };
* topic.publish(objectMessage, function(err) {});
*
* //-
* // You can also publish a batch of messages at once by supplying an array.
* //-
* topic.publish([ simpleMessage, objectMessage ], function(err) {});
* topic.publish([
* registrationMessage,
* purchaseMessage
* ], function(err) {});
*/
Topic.prototype.publish = function(message, callback) {
if (!message) {
throw new Error('Cannot publish an empty message.');
}

callback = callback || util.noop;

if (util.is(message, 'array')) {
this.publishBatch_(message, callback);
} else {
this.publishRaw(Topic.formatMessage_(message), callback);
}
};
Topic.prototype.publish = function(messages, callback) {
messages = util.arrayize(messages);

/**
* Publish a raw message.
*
* @throws {Error} If no message is provided.
*
* @param {object} message - Raw message to publish.
* @param {array=} message.label - List of labels for the message.
* @param {string} message.data - The base64-encoded contents of the message.
* @param {function=} callback - The callback function.
*
* @example
* topic.publishRaw({
* data: new Buffer('New message!').toString('base64')
* }, function(err) {});
*/
Topic.prototype.publishRaw = function(message, callback) {
if (!message) {
throw new Error('Cannot publish an empty message.');
if (messages.length === 0) {
throw new Error('Cannot publish without a message.');
}

callback = callback || util.noop;

var body = {
topic: this.name,
message: message
messages: messages.map(Topic.formatMessage_)
};

this.makeReq_('POST', 'topics/publish', null, body, callback);
this.makeReq_('POST', 'topics/publishBatch', null, body, callback);
};

/**
Expand Down Expand Up @@ -306,41 +283,4 @@ Topic.prototype.subscription = function(name, options) {
return new Subscription(this.pubsub, options);
};

/**
* Publish a batch of messages at once.
*
* @param {array} messages - The array of messages.
* @param {function} callback - The callback function.
*
* @private
*
* @example
* var messages = [
* // The simple string format is accepted...
* 'user registered',
*
* // ...as are raw message objects.
* {
* label: 'registration',
* data: new Buffer('new user: stephen').toString('base64')
* }
* ];
*
* topic.publishBatch_(messages, function(err) {});
*/
Topic.prototype.publishBatch_ = function(messages, callback) {
if (!messages) {
throw new Error('Cannot publish without an array of messages.');
}

callback = callback || util.noop;

var body = {
topic: this.name,
messages: util.arrayize(messages).map(Topic.formatMessage_)
};

this.makeReq_('POST', 'topics/publishBatch', null, body, callback);
};

module.exports = Topic;
174 changes: 41 additions & 133 deletions test/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,21 @@ describe('Topic', function() {
var messageString = 'string';
var messageBuffer = new Buffer(messageString);

var messageObjectWithString = { data: messageBuffer };
var messageObjectWithBuffer = { data: messageString };

var messageRaw = { data: messageBuffer.toString('base64') };

it('should handle strings', function() {
Topic.formatMessage_(messageString, messageRaw);
});

it('should handle buffers', function() {
Topic.formatMessage_(messageBuffer, messageRaw);
var messageObjectWithString = { data: messageString };
var messageObjectWithBuffer = { data: messageBuffer };

it('should handle string data', function() {
assert.deepEqual(
Topic.formatMessage_(messageObjectWithString),
{ data: new Buffer(JSON.stringify(messageString)).toString('base64') }
);
});

it('should handle objects with string messages', function() {
Topic.formatMessage_(messageObjectWithString, messageRaw);
});

it('should handle objects with buffer messages', function() {
Topic.formatMessage_(messageObjectWithBuffer, messageRaw);
it('should handle buffer data', function() {
assert.deepEqual(
Topic.formatMessage_(messageObjectWithBuffer),
{ data: messageBuffer.toString('base64') }
);
});
});

Expand All @@ -126,131 +122,43 @@ describe('Topic', function() {
});
});

describe('publishing', function() {
describe('publish', function() {
var message = 'howdy';
var messageRaw = { data: new Buffer(message).toString('base64') };

describe('publish', function() {
it('should throw if no message is provided', function() {
assert.throws(function() {
topic.publish();
}, /empty message/);
});

describe('single message', function() {
beforeEach(function() {
topic.publishRaw = util.noop;
});

it('should invoke publishRaw with a string', function(done) {
topic.publishRaw = function(message) {
assert.deepEqual(message, messageRaw);
done();
};

topic.publish(message, assert.ifError);
});
var messageObject = { data: message };

it('should pass callback', function(done) {
topic.publishRaw = function(message, callback) {
callback();
};
it('should throw if no message is provided', function() {
assert.throws(function() {
topic.publish();
}, /Cannot publish/);

topic.publish(message, done);
});
});

describe('array of messages', function() {
var messageArray = ['message1', 'message2'];

beforeEach(function() {
topic.publishBatch_ = util.noop;
});

it('should invoke publishBatch_ with an array', function(done) {
topic.publishBatch_ = function(messages) {
assert.deepEqual(messages, messageArray);
done();
};

topic.publish(messageArray, assert.ifError);
});

it('should pass callback', function(done) {
topic.publishBatch_ = function(msg, callback) {
callback();
};

topic.publish(messageArray, done);
});
});
assert.throws(function() {
topic.publish([]);
}, /Cannot publish/);
});

describe('publishBatch_', function() {
var messageArray = [
'user registered',
{ data: new Buffer('new user: stephen').toString('base64') }
];

it('should throw if no messages are provided', function() {
assert.throws(function() {
topic.publishBatch_();
}, /without an array/);
});

it('should send correct api request', function(done) {
topic.makeReq_ = function(method, path, query, body) {
assert.equal(method, 'POST');
assert.equal(path, 'topics/publishBatch');
assert.strictEqual(query, null);
assert.deepEqual(body, {
topic: topic.name,
messages: messageArray.map(Topic.formatMessage_)
});
done();
};

topic.publishBatch_(messageArray, assert.ifError);
});

it('should execute callback', function(done) {
topic.makeReq_ = function(method, path, query, body, callback) {
callback();
};
it('should send correct api request', function(done) {
topic.makeReq_ = function(method, path, query, body) {
assert.equal(method, 'POST');
assert.equal(path, 'topics/publishBatch');
assert.strictEqual(query, null);
assert.deepEqual(body, {
topic: topic.name,
messages: [
{ data: new Buffer(JSON.stringify(message)).toString('base64') }
]
});
done();
};

topic.publishBatch_(messageArray, done);
});
topic.publish(messageObject, assert.ifError);
});

describe('publishRaw', function() {
it('should throw if no message is provided', function() {
assert.throws(function() {
topic.publishRaw();
}, /empty message/);
});

it('should send correct api request', function(done) {
topic.makeReq_ = function(method, path, query, body) {
assert.equal(method, 'POST');
assert.equal(path, 'topics/publish');
assert.strictEqual(query, null);
assert.deepEqual(body, {
topic: topic.name,
message: Topic.formatMessage_(messageRaw)
});
done();
};

topic.publishRaw(messageRaw, assert.ifError);
});

it('should execute callback', function(done) {
topic.makeReq_ = function(method, path, query, body, callback) {
callback();
};
it('should execute callback', function(done) {
topic.makeReq_ = function(method, path, query, body, callback) {
callback();
};

topic.publishBatch_(messageRaw, done);
});
topic.publish(messageObject, done);
});
});

Expand Down

0 comments on commit e00d32f

Please sign in to comment.