A minimal queue library for node.js, backed by redis. Meant for high-throughput queues. Built for graceful shutdown.
One way to build large applications is to not build large applications at all, and instead split the application into small discrete logical pieces. A common way to talk between such micro-applications is to use message queues.
Cumin is a library to put items into a queue, and to 'listen' to the queue for new entries. Ordinarily, you would use either one of the two methods in your application.
Inspired by Resque and Kue. Adapted to fit node's style of doing multiple things simultaneously in the same process, and to work for high-throughput message scenarios.
This library is in heavy use at Errorception.
In app.js
:
var cumin = require("cumin")();
// When, say, signup is complete...
cumin.enqueue("mailer", {
to: "newuser@example.com",
type: "signup"
});
In mailer.js
:
var cumin = require("cumin")();
cumin.listen("mailer", function(mailJob, done) {
// mailJob looks as follows:
//
// {
// to: "newuser@example.com",
// type: "signup"
// }
//
// Remember to call 'done' when you've finished the task.
done();
});
An example can also be found in the examples
directory. node examples/enqueue.js
writes a messages every 10ms to a redis queue. node examples/listen.js
pops from the queue and simulates a 1s long task with the message.
With async/await:
const cumin = require('cumin')();
await cumin.enqueue(...); // Await waits till enqueue is complete
cumin.listen('mailer', async mailJob => { // Async function, no `done`.
...
});
One of the benefits of having a distributed setup like the one that cumin
encourages, is that you get to shut down parts of your app so that you can do upgrades etc. without any visible impact to your users. However, because of node's eventing model, your .listen
er app might have already picked up multiple items from the queue and might still be processing them when you kill your app. To prevent loss of tasks that might be mid-flight, cumin
adds a couple of graceful-shutdown features.
When you give either a SIGINT
or a SIGTERM
kill-signal to an app that is .listen
ing to a queue, cumin
will first stop accepting more items from redis. It will then wait for all in-flight jobs to complete. Only when all pending jobs have been completed does the app shut down. This ensures that you don't lose any items from your queue.
In case you want to bypass this check and kill the app anyway, you can simply send the SIGINT
or SIGTERM
signal a second time. The second signal will force a shutdown without regards for in-flight jobs.
$ npm install cumin
var cumin = require("cumin")(redisPort, redisHost, redisOptions)
Initializes the library. All parameters are optional, and default to localhost on the default redis port. These arguments are passed along to the node-redis module for connecting to redis.
Enqueues queueData
into a queue with name queueName
. You can optionally provide a callback to be called when the enqueue operation is complete. queueName
should be a string following redis' key naming rules. queueData
should be an object that will be JSON.stringify
ied and put into the queue.
Pops items out from the queueName
queue. The callback will be called whenever an item is popped, and the queueItem
will be passed in as an argument. The second argument to the callback is a function that should be called when the processing of the queue items is complete.
Lets say you want to record some data to your DB. However, before you write to the DB, you want to apply a bunch of rules (for example, rate-limiting) where each rule is resource heavy. You could build each of these rules as separate applications, and simply pass messages between them. The advantage of this way of structuring your application is that each piece is versioned, deployed and scaled independent of the rest of the system - a much simpler problem to solve rather than figuring out how to scale the entire system.
var cumin = require("cumin")();
cumin.listen("queue1", function(queueItem, done) {
if(weShouldProcess(queueItem)) {
cumin.enqueue("queue2", queueItem);
}
done();
});
Now, some other application can .listen
to queue2
and run its own filters and/or transforms, passing the data from queue to queue, until it finally gets written to the db. Since weShouldProcess
is effectively the only thing this application does, it's easy to spin up more processes to share the load, or even run this across more physical computers, making scaling very easy. Also, in the event of a crash, the entire application doesn't crash - just one component - and messages intended for it remain recorded in redis for when it's back up. Heck, if one of your apps can be better implemented in a different language, that shouldn't be hard either!
cumin
also stores some metadata about the queue activity in redis itself, so that you can monitor the health of the queue. Check out the cumin-monitor app for a real-time queue-monitoring tool that works with cumin
.
MIT