diff --git a/README.md b/README.md index 6cfb922c5..e201fc2dc 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,9 @@ Usage: * [`each`](#each) * [`eachSeries`](#eachSeries) * [`eachLimit`](#eachLimit) +* [`forEachOf`](#forEachOf) +* [`forEachOfSeries`](#forEachOfSeries) +* [`forEachOfLimit`](#forEachOfLimit) * [`map`](#map) * [`mapSeries`](#mapSeries) * [`mapLimit`](#mapLimit) @@ -191,7 +194,8 @@ __Arguments__ * `iterator(item, callback)` - A function to apply to each item in `arr`. The iterator is passed a `callback(err)` which must be called once it has completed. If no error has occurred, the `callback` should be run without - arguments or with an explicit `null` argument. + arguments or with an explicit `null` argument. The array index is not passed + to the iterator. If you need the index, use [`forEachOf`](#forEachOf). * `callback(err)` - A callback which is called when all `iterator` functions have finished, or an error occurs. @@ -280,6 +284,67 @@ async.eachLimit(documents, 20, requestApi, function(err){ }); ``` +--------------------------------------- + + + + +### forEachOf(obj, iterator, callback) + +Like `each`, except that it iterates over objects, and passes the key as the second argument to the iterator. + +__Arguments__ + +* `obj` - An object or array to iterate over. +* `iterator(item, key, callback)` - A function to apply to each item in `obj`. +The `key` is the item's key, or index in the case of an array. The iterator is +passed a `callback(err)` which must be called once it has completed. If no +error has occurred, the callback should be run without arguments or with an +explicit `null` argument. +* `callback(err)` - A callback which is called when all `iterator` functions have finished, or an error occurs. + +__Example__ + +```js +var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"}; +var configs = {}; + +async.forEachOf(obj, function (value, key, callback) { + fs.readFile(__dirname + value, "utf8", function (err, data) { + if (err) return callback(err); + try { + configs[key] = JSON.parse(data); + } catch (e) { + return callback(e); + } + callback(); + }) +}, function (err) { + if (err) console.error(err.message); + // configs is now a map of JSON data + doSomethingWith(configs); +}) +``` + +--------------------------------------- + + + + +### forEachOfSeries(obj, iterator, callback) + +Like [`forEachOf`](#forEachOf), except only one `iterator` is run at a time. The order of execution is not guaranteed for objects, but it will be guaranteed for arrays. + +--------------------------------------- + + + + +### forEachOfLimit(obj, limit, iterator, callback) + +Like [`forEachOf`](#forEachOf), except the number of `iterator`s running at a given time is controlled by `limit`. + + --------------------------------------- diff --git a/lib/async.js b/lib/async.js index 394c41cad..87ebe47a2 100644 --- a/lib/async.js +++ b/lib/async.js @@ -68,6 +68,14 @@ return memo; }; + var _forEachOf = function (object, iterator) { + for (key in object) { + if (object.hasOwnProperty(key)) { + iterator(object[key], key); + } + } + }; + var _keys = function (obj) { if (Object.keys) { return Object.keys(obj); @@ -163,6 +171,7 @@ }; async.forEachSeries = async.eachSeries; + async.eachLimit = function (arr, limit, iterator, callback) { var fn = _eachLimit(limit); fn.apply(null, [arr, iterator, callback]); @@ -210,6 +219,115 @@ }; + + async.forEachOf = async.eachOf = function (object, iterator, callback) { + callback = callback || function () {}; + var size = object.length || _keys(object).length; + var completed = 0 + if (!size) { + return callback(); + } + _forEachOf(object, function (value, key) { + iterator(object[key], key, function (err) { + if (err) { + callback(err); + callback = function () {}; + } else { + completed += 1; + if (completed === size) { + callback(null); + } + } + }); + }); + }; + + async.forEachOfSeries = async.eachOfSeries = function (obj, iterator, callback) { + callback = callback || function () {}; + var keys = _keys(obj); + var size = keys.length; + if (!size) { + return callback(); + } + var completed = 0; + var iterate = function () { + var sync = true; + var key = keys[completed]; + iterator(obj[key], key, function (err) { + if (err) { + callback(err); + callback = function () {}; + } + else { + completed += 1; + if (completed >= size) { + callback(null); + } + else { + if (sync) { + async.nextTick(iterate); + } + else { + iterate(); + } + } + } + }); + sync = false; + }; + iterate(); + }; + + + + async.forEachOfLimit = async.eachOfLimit = function (obj, limit, iterator, callback) { + _forEachOfLimit(limit)(obj, iterator, callback); + }; + + var _forEachOfLimit = function (limit) { + + return function (obj, iterator, callback) { + callback = callback || function () {}; + var keys = _keys(obj); + var size = keys.length; + if (!size || limit <= 0) { + return callback(); + } + var completed = 0; + var started = 0; + var running = 0; + + (function replenish () { + if (completed >= size) { + return callback(); + } + + while (running < limit && started < size) { + started += 1; + running += 1; + var key = keys[started - 1]; + iterator(obj[key], key, function (err) { + if (err) { + callback(err); + callback = function () {}; + } + else { + completed += 1; + running -= 1; + if (completed >= size) { + callback(); + } + else { + replenish(); + } + } + }); + } + })(); + }; + }; + + var doParallel = function (fn) { return function () { var args = Array.prototype.slice.call(arguments); diff --git a/test/test-async.js b/test/test-async.js index 2d46b8cef..73e712214 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -17,6 +17,13 @@ function eachIterator(args, x, callback) { }, x*25); } +function forEachOfIterator(args, value, key, callback) { + setTimeout(function(){ + args.push(key, value); + callback(); + }, value*25); +} + function mapIterator(call_order, x, callback) { setTimeout(function(){ call_order.push(x); @@ -43,6 +50,13 @@ function eachNoCallbackIterator(test, x, callback) { test.done(); } +function forEachOfNoCallbackIterator(test, x, key, callback) { + test.equal(x, 1); + test.equal(key, "a"); + callback(); + test.done(); +} + function getFunctionsObject(call_order) { return { one: function(callback){ @@ -619,7 +633,7 @@ exports['retry as an embedded task'] = function(test) { var retryResult = 'RETRY'; var fooResults; var retryResults; - + async.auto({ foo: function(callback, results){ fooResults = results; @@ -1165,6 +1179,47 @@ exports['forEach alias'] = function (test) { test.done(); }; +exports['forEachOf'] = function(test){ + var args = []; + async.forEachOf({ a: 1, b: 2 }, forEachOfIterator.bind(this, args), function(err){ + test.same(args, ["a", 1, "b", 2]); + test.done(); + }); +}; + +exports['forEachOf empty object'] = function(test){ + test.expect(1); + async.forEachOf({}, function(value, key, callback){ + test.ok(false, 'iterator should not be called'); + callback(); + }, function(err) { + test.ok(true, 'should call callback'); + }); + setTimeout(test.done, 25); +}; + +exports['forEachOf error'] = function(test){ + test.expect(1); + async.forEachOf({ a: 1, b: 2 }, function(value, key, callback) { + callback('error'); + }, function(err){ + test.equals(err, 'error'); + }); + setTimeout(test.done, 50); +}; + +exports['forEachOf no callback'] = function(test){ + async.forEachOf({ a: 1 }, forEachOfNoCallbackIterator.bind(this, test)); +}; + +exports['forEachOf with array'] = function(test){ + var args = []; + async.forEachOf([ "a", "b" ], forEachOfIterator.bind(this, args), function(err){ + test.same(args, [0, "a", 1, "b"]); + test.done(); + }); +}; + exports['eachSeries'] = function(test){ var args = []; async.eachSeries([1,3,2], eachIterator.bind(this, args), function(err){ @@ -1201,10 +1256,6 @@ exports['eachSeries no callback'] = function(test){ async.eachSeries([1], eachNoCallbackIterator.bind(this, test)); }; -exports['forEachSeries alias'] = function (test) { - test.strictEqual(async.eachSeries, async.forEachSeries); - test.done(); -}; exports['eachLimit'] = function(test){ var args = []; @@ -1293,11 +1344,153 @@ exports['eachLimit synchronous'] = function(test){ }); }; +exports['forEachSeries alias'] = function (test) { + test.strictEqual(async.eachSeries, async.forEachSeries); + test.done(); +}; + +exports['forEachOfSeries'] = function(test){ + var args = []; + async.forEachOfSeries({ a: 1, b: 2 }, forEachOfIterator.bind(this, args), function(err){ + test.same(args, [ "a", 1, "b", 2 ]); + test.done(); + }); +}; + +exports['forEachOfSeries empty object'] = function(test){ + test.expect(1); + async.forEachOfSeries({}, function(x, callback){ + test.ok(false, 'iterator should not be called'); + callback(); + }, function(err){ + test.ok(true, 'should call callback'); + }); + setTimeout(test.done, 25); +}; + +exports['forEachOfSeries error'] = function(test){ + test.expect(2); + var call_order = []; + async.forEachOfSeries({ a: 1, b: 2 }, function(value, key, callback){ + call_order.push(value, key); + callback('error'); + }, function(err){ + test.same(call_order, [ 1, "a" ]); + test.equals(err, 'error'); + }); + setTimeout(test.done, 50); +}; + +exports['forEachOfSeries no callback'] = function(test){ + async.forEachOfSeries({ a: 1 }, forEachOfNoCallbackIterator.bind(this, test)); +}; + +exports['forEachOfSeries with array'] = function(test){ + var args = []; + async.forEachOfSeries([ "a", "b" ], forEachOfIterator.bind(this, args), function(err){ + test.same(args, [ 0, "a", 1, "b" ]); + test.done(); + }); +}; + exports['forEachLimit alias'] = function (test) { test.strictEqual(async.eachLimit, async.forEachLimit); test.done(); }; +exports['forEachOfLimit'] = function(test){ + var args = []; + var obj = { a: 1, b: 2, c: 3, d: 4 }; + async.forEachOfLimit(obj, 2, function(value, key, callback){ + setTimeout(function(){ + args.push(value, key); + callback(); + }, value * 5); + }, function(err){ + test.same(args, [ 1, "a", 2, "b", 3, "c", 4, "d" ]); + test.done(); + }); +}; + +exports['forEachOfLimit empty object'] = function(test){ + test.expect(1); + async.forEachOfLimit({}, 2, function(value, key, callback){ + test.ok(false, 'iterator should not be called'); + callback(); + }, function(err){ + test.ok(true, 'should call callback'); + }); + setTimeout(test.done, 25); +}; + +exports['forEachOfLimit limit exceeds size'] = function(test){ + var args = []; + var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 }; + async.forEachOfLimit(obj, 10, forEachOfIterator.bind(this, args), function(err){ + test.same(args, [ "a", 1, "b", 2, "c", 3, "d", 4, "e", 5 ]); + test.done(); + }); +}; + +exports['forEachOfLimit limit equal size'] = function(test){ + var args = []; + var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 }; + async.forEachOfLimit(obj, 5, forEachOfIterator.bind(this, args), function(err){ + test.same(args, [ "a", 1, "b", 2, "c", 3, "d", 4, "e", 5 ]); + test.done(); + }); +}; + +exports['forEachOfLimit zero limit'] = function(test){ + test.expect(1); + async.forEachOfLimit({ a: 1, b: 2 }, 0, function(x, callback){ + test.ok(false, 'iterator should not be called'); + callback(); + }, function(err){ + test.ok(true, 'should call callback'); + }); + setTimeout(test.done, 25); +}; + +exports['forEachOfLimit error'] = function(test){ + test.expect(2); + var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 }; + var call_order = []; + + async.forEachOfLimit(obj, 3, function(value, key, callback){ + call_order.push(value, key); + if (value === 2) { + callback('error'); + } + }, function(err){ + test.same(call_order, [ 1, "a", 2, "b" ]); + test.equals(err, 'error'); + }); + setTimeout(test.done, 25); +}; + +exports['forEachOfLimit no callback'] = function(test){ + async.forEachOfLimit({ a: 1 }, 1, forEachOfNoCallbackIterator.bind(this, test)); +}; + +exports['forEachOfLimit synchronous'] = function(test){ + var args = []; + var obj = { a: 1, b: 2 }; + async.forEachOfLimit(obj, 5, forEachOfIterator.bind(this, args), function(err){ + test.same(args, [ "a", 1, "b", 2 ]); + test.done(); + }); +}; + +exports['forEachOfLimit with array'] = function(test){ + var args = []; + var arr = [ "a", "b" ] + async.forEachOfLimit(arr, 1, forEachOfIterator.bind(this, args), function (err) { + test.same(args, [ 0, "a", 1, "b" ]); + test.done(); + }); +}; + exports['map'] = function(test){ var call_order = []; async.map([1,3,2], mapIterator.bind(this, call_order), function(err, results){ @@ -2806,20 +2999,20 @@ exports['cargo bulk task'] = function (test) { }; exports['cargo drain once'] = function (test) { - + var c = async.cargo(function (tasks, callback) { callback(); }, 3); - + var drainCounter = 0; c.drain = function () { drainCounter++; } - + for(var i = 0; i < 10; i++){ c.push(i); } - + setTimeout(function(){ test.equal(drainCounter, 1); test.done(); @@ -2827,17 +3020,17 @@ exports['cargo drain once'] = function (test) { }; exports['cargo drain twice'] = function (test) { - + var c = async.cargo(function (tasks, callback) { callback(); }, 3); - + var loadCargo = function(){ for(var i = 0; i < 10; i++){ c.push(i); } }; - + var drainCounter = 0; c.drain = function () { drainCounter++; @@ -3161,7 +3354,7 @@ exports['queue started'] = function(test) { var calls = []; var q = async.queue(function(task, cb) {}); - + test.equal(q.started, false); q.push([]); test.equal(q.started, true);