diff --git a/README.md b/README.md index cbdd27b1..b6c65693 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ Each consumer is an [`EventEmitter`](http://nodejs.org/api/events.html) and emit |Event|Params|Description| |-----|------|-----------| -|`error`|`err`|Fired when an error occurs interacting with the queue or processing the message.| +|`error`|`err`|Fired when an error occurs interacting with the queue.| +|`processing_error`|`err`|Fired when an error occurs processing the message.| |`message_received`|`message`|Fired when a message is received.| |`message_processed`|`message`|Fired when a message is successfully processed and removed from the queue.| diff --git a/index.js b/index.js index 237008ea..54347fd4 100644 --- a/index.js +++ b/index.js @@ -10,6 +10,15 @@ var requiredOptions = [ 'handleMessage' ]; +/** + * Construct a new SQSError + */ +function SQSError(message) { + this.name = 'SQSError'; + this.message = (message || ''); +} +SQSError.prototype = Error.prototype; + function validate(options) { requiredOptions.forEach(function (option) { if (!options[option]) { @@ -97,7 +106,7 @@ Consumer.prototype._poll = function () { }; Consumer.prototype._handleSqsResponse = function (err, response) { - if (err) this.emit('error', new Error('SQS receive message failed: ' + err.message)); + if (err) this.emit('error', new SQSError('SQS receive message failed: ' + err.message)); var consumer = this; @@ -128,11 +137,14 @@ Consumer.prototype._processMessage = function (message, cb) { } ], function (err) { if (err) { - consumer.emit('error', err); + if (err.name === 'SQSError') { + consumer.emit('error', err); + } else { + consumer.emit('processing_error', err); + } } else { consumer.emit('message_processed', message); } - cb(); }); }; @@ -145,7 +157,7 @@ Consumer.prototype._deleteMessage = function (message, cb) { debug('Deleting message %s', message.MessageId); this.sqs.deleteMessage(deleteParams, function (err) { - if (err) return cb(new Error('SQS delete message failed: ' + err.message)); + if (err) return cb(new SQSError('SQS delete message failed: ' + err.message)); cb(); }); diff --git a/test/index.js b/test/index.js index ac2ea2d9..9cddf684 100644 --- a/test/index.js +++ b/test/index.js @@ -120,7 +120,7 @@ describe('Consumer', function () { handleMessage.yields(processingErr); - consumer.on('error', function (err) { + consumer.on('processing_error', function (err) { assert.equal(err, processingErr); done(); }); @@ -174,7 +174,7 @@ describe('Consumer', function () { it('doesn\'t delete the message when a processing error is reported', function () { handleMessage.yields(new Error('Processing error')); - consumer.on('error', function () { + consumer.on('processing_error', function () { // ignore the error });