Skip to content

Commit

Permalink
Merge pull request #1 from BBC/batch
Browse files Browse the repository at this point in the history
Feature: Add batchSize to enable multiple messages to be consumed at once
  • Loading branch information
robinjmurphy committed Jan 14, 2015
2 parents 97975f3 + 5b62ccf commit dff7b66
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 67 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ app.start();
* The queue is polled continuously for messages using [long polling](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html).
* Messages are deleted from the queue once `done()` is called.
* Calling `done(err)` with an error object will cause the message to be left on the queue. An [SQS redrive policy](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html) can be used to move messages that cannot be processed to a dead letter queue.
* By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options).

## API

Expand All @@ -47,7 +48,7 @@ Creates a new SQS consumer.
* `queueUrl` - _String_ - The SQS queue URL
* `region` - _String_ - The AWS region
* `handleMessage` - _Function_ - A function to be called whenever a message is receieved. Receives an SQS message object as its first argument and a function to call when the message has been handled as its second argument (i.e. `handleMessage(message, done)`).
* `waitTime` - _Number_ - An optional time in milliseconds to wait after recieving a message before requesting another one. This enables you to throttle the rate at which messages will be received. (default `100`);
* `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.
* `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually

### `consumer.start()`
Expand Down
61 changes: 38 additions & 23 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var _ = require('lodash');
var async = require('async');
var AWS = require('aws-sdk');
var debug = require('debug')('sqs-consumer');
var requiredOptions = [
Expand All @@ -15,6 +16,10 @@ function validate(options) {
throw new Error('Missing SQS consumer option [' + option + '].');
}
});

if (options.batchSize > 10 || options.batchSize < 1) {
throw new Error('SQS batchSize option must be between 1 and 10.');
}
}

/**
Expand All @@ -23,7 +28,7 @@ function validate(options) {
* @param {string} options.queueUrl
* @param {string} options.region
* @param {function} options.handleMessage
* @param {number} options.waitTime
* @param {number} options.batchSize
* @param {object} options.sqs
*/
function Consumer(options) {
Expand All @@ -32,10 +37,10 @@ function Consumer(options) {
this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
this.stopped = true;
this.batchSize = options.batchSize || 1;
this.sqs = options.sqs || new AWS.SQS({
region: options.region
});
this.poll = _.throttle(this._poll.bind(this), options.waitTime || 100);
}

util.inherits(Consumer, EventEmitter);
Expand All @@ -47,7 +52,7 @@ Consumer.prototype.start = function () {
if (this.stopped) {
debug('Starting consumer');
this.stopped = false;
this.poll();
this._poll();
}
};

Expand All @@ -62,7 +67,7 @@ Consumer.prototype.stop = function () {
Consumer.prototype._poll = function () {
var receiveParams = {
QueueUrl: this.queueUrl,
MaxNumberOfMessages: 1,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: 20
};

Expand All @@ -75,42 +80,52 @@ Consumer.prototype._poll = function () {
Consumer.prototype._handleSqsResponse = function (err, response) {
if (err) this.emit('error', err);

var consumer = this;

debug('Received SQS response');
debug(response);
if (response && response.Messages && response.Messages.length > 0) {
var message = response.Messages[0];

this.emit('message_received', message);
this._handleSqsMessage(message);
if (response && response.Messages && response.Messages.length > 0) {
async.each(response.Messages, this._processMessage.bind(this), function () {
// start polling again once all of the messages have been processed
consumer._poll();
});
} else {
// there were no messages, so start polling again
this._poll();
}

// Poll for another message
this.poll();
};

Consumer.prototype._handleSqsMessage = function (message) {
Consumer.prototype._processMessage = function (message, cb) {
var consumer = this;

this.handleMessage(message, function (err) {
if (err) return consumer.emit('error', err);
this.emit('message_received', message);
async.series([
function handleMessage(done) {
consumer.handleMessage(message, done);
},
function deleteMessage(done) {
consumer._deleteMessage(message, done);
}
], function (err) {
if (err) {
consumer.emit('error', err);
} else {
consumer.emit('message_processed', message);
}

consumer._deleteMessage(message);
cb();
});
};

Consumer.prototype._deleteMessage = function (message) {
var consumer = this;
Consumer.prototype._deleteMessage = function (message, cb) {
var deleteParams = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
};

debug('Deleting message %s', message.MessageId);
this.sqs.deleteMessage(deleteParams, function (err) {
if (err) return consumer.emit('error', err);

consumer.emit('message_processed', message);
});
this.sqs.deleteMessage(deleteParams, cb);
};

module.exports = Consumer;
module.exports = Consumer;
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sqs-consumer",
"version": "1.1.1",
"version": "2.0.0",
"description": "Build SQS-based Node applications without the boilerplate",
"main": "index.js",
"scripts": {
Expand All @@ -26,6 +26,7 @@
"sinon": "^1.10.3"
},
"dependencies": {
"async": "^0.9.0",
"aws-sdk": "^2.0.23",
"debug": "^2.1.0",
"lodash": "^2.4.1"
Expand Down
157 changes: 115 additions & 42 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ describe('Consumer', function () {
};

beforeEach(function () {
handleMessage = sinon.stub();
handleMessage = sinon.stub().yieldsAsync(null);
sqs = sinon.mock();
sqs.receiveMessage = sinon.stub().yields(null, response);
sqs.receiveMessage = sinon.stub().yieldsAsync(null, response);
sqs.receiveMessage.onSecondCall().returns();
sqs.deleteMessage = sinon.stub().yields(null);
sqs.deleteMessage = sinon.stub().yieldsAsync(null);
consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage: handleMessage,
waitTime: 10,
sqs: sqs
});
});
Expand Down Expand Up @@ -56,47 +55,29 @@ describe('Consumer', function () {
});
});

describe('.start', function () {
it('calls the handleMessage function when a message is received', function () {
consumer.start();

sinon.assert.calledWith(handleMessage, response.Messages[0]);
});

it('deletes the message when the handleMessage callback is called', function () {
handleMessage.yields(null);

consumer.start();

sinon.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
it('requires the batchSize option to be no greater than 10', function () {
assert.throws(function () {
new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
handleMessage: handleMessage,
batchSize: 11
});
});
});

it('doesn\'t delete the message when a processing error is reported', function () {
handleMessage.yields(new Error('Processing error'));

consumer.on('error', function () {
// ignore the error
it('requires the batchSize option to be greater than 0', function () {
assert.throws(function () {
new Consumer({
region: 'some-region',
queueUrl: 'some-queue-url',
handleMessage: handleMessage,
batchSize: -1
});

consumer.start();

sinon.assert.notCalled(sqs.deleteMessage);
});

it('waits before consuming new messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);

consumer.start();

setTimeout(function () {
sinon.assert.calledTwice(handleMessage);
done();
}, 11);
});
});

describe('.start', function () {
it('fires an error event when an error occurs receiving a message', function (done) {
var receiveErr = new Error('Receive error');

Expand Down Expand Up @@ -157,26 +138,118 @@ describe('Consumer', function () {
consumer.start();
});

it('doesn\'t consumer more messages when called multiple times', function () {
it('calls the handleMessage function when a message is received', function (done) {
consumer.start();

consumer.on('message_processed', function () {
sinon.assert.calledWith(handleMessage, response.Messages[0]);
done();
});
});

it('deletes the message when the handleMessage callback is called', function (done) {
handleMessage.yields(null);

consumer.start();

consumer.on('message_processed', function () {
sinon.assert.calledWith(sqs.deleteMessage, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle'
});
done();
});
});

it('doesn\'t delete the message when a processing error is reported', function () {
handleMessage.yields(new Error('Processing error'));

consumer.on('error', function () {
// ignore the error
});

consumer.start();

sinon.assert.notCalled(sqs.deleteMessage);
});

it('consumes another message once one is processed', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
sqs.receiveMessage.onThirdCall().returns();

consumer.start();
setTimeout(function () {
sinon.assert.calledTwice(handleMessage);
done();
}, 10);
});

it('doesn\'t consume more messages when called multiple times', function () {
sqs.receiveMessage = sinon.stub().returns();
consumer.start();
consumer.start();
consumer.start();
consumer.start();
consumer.start();

sinon.assert.calledOnce(sqs.receiveMessage);
});

it('consumes multiple messages when the batchSize is greater than 1', function (done) {
sqs.receiveMessage.yieldsAsync(null, {
Messages: [
{
ReceiptHandle: 'receipt-handle-1',
MessageId: '1',
Body: 'body-1'
},
{
ReceiptHandle: 'receipt-handle-2',
MessageId: '2',
Body: 'body-2'
},
{
ReceiptHandle: 'receipt-handle-3',
MessageId: '3',
Body: 'body-3'
}
]
});

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage: handleMessage,
batchSize: 3,
sqs: sqs
});

consumer.start();

setTimeout(function () {
sinon.assert.calledWith(sqs.receiveMessage, {
QueueUrl: 'some-queue-url',
MaxNumberOfMessages: 3,
WaitTimeSeconds: 20
});
sinon.assert.callCount(handleMessage, 3);
done();
}, 10);
});
});

describe('.stop', function () {
it('stops the consumer polling for messages', function (done) {
sqs.receiveMessage.onSecondCall().yields(null, response);
sqs.receiveMessage.onSecondCall().yieldsAsync(null, response);
sqs.receiveMessage.onThirdCall().returns();

consumer.start();
consumer.stop();

setTimeout(function () {
sinon.assert.calledOnce(handleMessage);
done();
}, consumer.waitTime + 1);
}, 10);
});
});
});

0 comments on commit dff7b66

Please sign in to comment.