Skip to content

Commit

Permalink
adding forEachOfSeries and forEachOfLimit along with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Dominic Barnes committed Feb 7, 2013
1 parent e0a51c3 commit 2a13d08
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 4 deletions.
88 changes: 87 additions & 1 deletion lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@

async.forEachOf = function (object, iterator, callback) {
callback = callback || function () {};
var completed = 0, size = _keys(object).length, key;
var size = object.length || _keys(object).length;
var completed = 0
if (!size) {
return callback();
}
Expand Down Expand Up @@ -173,6 +174,42 @@
iterate();
};

async.forEachOfSeries = function (obj, iterator, callback) {
callback = callback || function () {};
var keys = _keys(obj);
var size = keys.length;
if (!size) {
return callback();
}
var completed = 0;
var iterate = function () {
var sync = true;
var key = keys[completed];
iterator(obj[key], key, function (err) {
if (err) {
callback(err);
callback = function () {};
}
else {
completed += 1;
if (completed >= size) {
callback(null);
}
else {
if (sync) {
async.nextTick(iterate);
}
else {
iterate();
}
}
}
});
sync = false;
};
iterate();
};

async.forEachLimit = function (arr, limit, iterator, callback) {
var fn = _forEachLimit(limit);
fn.apply(null, [arr, iterator, callback]);
Expand Down Expand Up @@ -219,6 +256,55 @@
};


async.forEachOfLimit = function (obj, limit, iterator, callback) {
var fn = obj.constructor === Array ? _forEachOfLimit(limit) : _forEachOfLimit(limit);
fn.apply(null, [obj, iterator, callback]);
};

var _forEachOfLimit = function (limit) {

return function (obj, iterator, callback) {
callback = callback || function () {};
var keys = _keys(obj);
var size = keys.length;
if (!size || limit <= 0) {
return callback();
}
var completed = 0;
var started = 0;
var running = 0;

(function replenish () {
if (completed >= size) {
return callback();
}

while (running < limit && started < size) {
started += 1;
running += 1;
var key = keys[started - 1];
iterator(obj[key], key, function (err) {
if (err) {
callback(err);
callback = function () {};
}
else {
completed += 1;
running -= 1;
if (completed >= size) {
callback();
}
else {
replenish();
}
}
});
}
})();
};
};


var doParallel = function (fn) {
return function () {
var args = Array.prototype.slice.call(arguments);
Expand Down
151 changes: 148 additions & 3 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ function forEachIterator(args, x, callback) {
}, x*25);
}

function forEachOfIterator(args, x, key, callback) {
function forEachOfIterator(args, value, key, callback) {
setTimeout(function(){
args.push(key, x);
args.push(key, value);
callback();
}, x*25);
}, value*25);
}

function mapIterator(call_order, x, callback) {
Expand Down Expand Up @@ -699,6 +699,14 @@ exports['forEachOf no callback'] = function(test){
async.forEachOf({ a: 1 }, forEachOfNoCallbackIterator.bind(this, test));
};

exports['forEachOf with array'] = function(test){
var args = [];
async.forEachOf([ "a", "b" ], forEachOfIterator.bind(this, args), function(err){
test.same(args, [0, "a", 1, "b"]);
test.done();
});
};

exports['forEachSeries'] = function(test){
var args = [];
async.forEachSeries([1,3,2], forEachIterator.bind(this, args), function(err){
Expand Down Expand Up @@ -735,6 +743,50 @@ exports['forEachSeries no callback'] = function(test){
async.forEachSeries([1], forEachNoCallbackIterator.bind(this, test));
};

exports['forEachOfSeries'] = function(test){
var args = [];
async.forEachOfSeries({ a: 1, b: 2 }, forEachOfIterator.bind(this, args), function(err){
test.same(args, [ "a", 1, "b", 2 ]);
test.done();
});
};

exports['forEachOfSeries empty object'] = function(test){
test.expect(1);
async.forEachOfSeries({}, function(x, callback){
test.ok(false, 'iterator should not be called');
callback();
}, function(err){
test.ok(true, 'should call callback');
});
setTimeout(test.done, 25);
};

exports['forEachOfSeries error'] = function(test){
test.expect(2);
var call_order = [];
async.forEachOfSeries({ a: 1, b: 2 }, function(value, key, callback){
call_order.push(value, key);
callback('error');
}, function(err){
test.same(call_order, [ 1, "a" ]);
test.equals(err, 'error');
});
setTimeout(test.done, 50);
};

exports['forEachOfSeries no callback'] = function(test){
async.forEachOfSeries({ a: 1 }, forEachOfNoCallbackIterator.bind(this, test));
};

exports['forEachOfSeries with array'] = function(test){
var args = [];
async.forEachOfSeries([ "a", "b" ], forEachOfIterator.bind(this, args), function(err){
test.same(args, [ 0, "a", 1, "b" ]);
test.done();
});
};

exports['forEachLimit'] = function(test){
var args = [];
var arr = [0,1,2,3,4,5,6,7,8,9];
Expand Down Expand Up @@ -822,6 +874,99 @@ exports['forEachLimit synchronous'] = function(test){
});
};

exports['forEachOfLimit'] = function(test){
var args = [];
var obj = { a: 1, b: 2, c: 3, d: 4 };
async.forEachOfLimit(obj, 2, function(value, key, callback){
setTimeout(function(){
args.push(value, key);
callback();
}, value * 5);
}, function(err){
test.same(args, [ 1, "a", 2, "b", 3, "c", 4, "d" ]);
test.done();
});
};

exports['forEachOfLimit empty object'] = function(test){
test.expect(1);
async.forEachOfLimit({}, 2, function(value, key, callback){
test.ok(false, 'iterator should not be called');
callback();
}, function(err){
test.ok(true, 'should call callback');
});
setTimeout(test.done, 25);
};

exports['forEachOfLimit limit exceeds size'] = function(test){
var args = [];
var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 };
async.forEachOfLimit(obj, 10, forEachOfIterator.bind(this, args), function(err){
test.same(args, [ "a", 1, "b", 2, "c", 3, "d", 4, "e", 5 ]);
test.done();
});
};

exports['forEachOfLimit limit equal size'] = function(test){
var args = [];
var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 };
async.forEachOfLimit(obj, 5, forEachOfIterator.bind(this, args), function(err){
test.same(args, [ "a", 1, "b", 2, "c", 3, "d", 4, "e", 5 ]);
test.done();
});
};

exports['forEachOfLimit zero limit'] = function(test){
test.expect(1);
async.forEachOfLimit({ a: 1, b: 2 }, 0, function(x, callback){
test.ok(false, 'iterator should not be called');
callback();
}, function(err){
test.ok(true, 'should call callback');
});
setTimeout(test.done, 25);
};

exports['forEachOfLimit error'] = function(test){
test.expect(2);
var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 };
var call_order = [];

async.forEachOfLimit(obj, 3, function(value, key, callback){
call_order.push(value, key);
if (value === 2) {
callback('error');
}
}, function(err){
test.same(call_order, [ 1, "a", 2, "b" ]);
test.equals(err, 'error');
});
setTimeout(test.done, 25);
};

exports['forEachOfLimit no callback'] = function(test){
async.forEachOfLimit({ a: 1 }, 1, forEachOfNoCallbackIterator.bind(this, test));
};

exports['forEachOfLimit synchronous'] = function(test){
var args = [];
var obj = { a: 1, b: 2 };
async.forEachOfLimit(obj, 5, forEachOfIterator.bind(this, args), function(err){
test.same(args, [ "a", 1, "b", 2 ]);
test.done();
});
};

exports['forEachOfLimit with array'] = function(test){
var args = [];
var arr = [ "a", "b" ]
async.forEachOfLimit(arr, 1, forEachOfIterator.bind(this, args), function (err) {
test.same(args, [ 0, "a", 1, "b" ]);
test.done();
});
};

exports['map'] = function(test){
var call_order = [];
async.map([1,3,2], mapIterator.bind(this, call_order), function(err, results){
Expand Down

0 comments on commit 2a13d08

Please sign in to comment.