Skip to content

Commit

Permalink
Merge pull request #769 from caolan/ensure_async
Browse files Browse the repository at this point in the history
ensureAsync
  • Loading branch information
aearly committed May 26, 2015
2 parents f71193a + 7c7326b commit 1b258fb
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 22 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Usage:

* [`memoize`](#memoize)
* [`unmemoize`](#unmemoize)
* [`ensureAsync`](#ensureAsync)
* [`log`](#log)
* [`dir`](#dir)
* [`noConflict`](#noConflict)
Expand Down Expand Up @@ -1657,6 +1658,41 @@ __Arguments__

* `fn` - the memoized function

---------------------------------------

<a name="ensureAsync" />
### ensureAsync(fn)

Wrap an async function and ensure it calls its callback on a later tick of the event loop. If the function already calls its callback on a next tick, no extra deferral is added. This is useful for preventing stack overflows (`RangeError: Maximum call stack size exceeded`) and generally keeping [Zalgo](http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony) contained.

__Arguments__

* `fn` - an async function, one that expects a node-style callback as its last argument

Returns a wrapped function with the exact same call signature as the function passed in.

__Example__

```js
function sometimesAsync(arg, callback) {
if (cache[arg]) {
return callback(null, cache[arg]); // this would be synchronous!!
} else {
doSomeIO(arg, callback); // this IO would be asynchronous
}
}

// this has a risk of stack overflows if many results are cached in a row
async.mapSeries(args, sometimesAsync, done);

// this will defer sometimesAsync's callback if necessary,
// preventing stack overflows
async.mapSeries(args, async.ensureAsync(sometimesAsync), done);

```

---------------------------------------

<a name="log" />
### log(function, arguments)

Expand Down
31 changes: 26 additions & 5 deletions lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -1255,18 +1255,39 @@
async.applyEachSeries = doSeries(_applyEach);

async.forever = function (fn, callback) {
var done = only_once(callback || noop);
var task = ensureAsync(fn);
function next(err) {
if (err) {
if (callback) {
return callback(err);
}
throw err;
return done(err);
}
fn(next);
task(next);
}
next();
};

function ensureAsync(fn) {
return function (/*...args, callback*/) {
var args = _baseSlice(arguments);
var callback = args.pop();
args.push(function () {
var innerArgs = arguments;
if (sync) {
async.setImmediate(function () {
callback.apply(null, innerArgs);
});
} else {
callback.apply(null, innerArgs);
}
});
var sync = true;
fn.apply(this, args);
sync = false;
};
}

async.ensureAsync = ensureAsync;

// Node.js
if (typeof module !== 'undefined' && module.exports) {
module.exports = async;
Expand Down
43 changes: 32 additions & 11 deletions perf/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var _ = require("lodash");
var Benchmark = require("benchmark");
var benchOptions = {defer: true, minSamples: 1, maxTime: 2};
var exec = require("child_process").exec;
var fs = require("fs");
var path = require("path");
Expand All @@ -16,8 +15,11 @@ var args = require("yargs")
.alias("g", "grep")
.default("g", ".*")
.describe("i", "skip benchmarks whose names match this regex")
.alias("g", "reject")
.alias("i", "reject")
.default("i", "^$")
.describe("l", "maximum running time per test (in seconds)")
.alias("l", "limit")
.default("l", 2)
.help('h')
.alias('h', 'help')
.example('$0 0.9.2 0.9.0', 'Compare v0.9.2 with v0.9.0')
Expand All @@ -33,6 +35,7 @@ var reject = new RegExp(args.i, "i");
var version0 = args._[0] || require("../package.json").version;
var version1 = args._[1] || "current";
var versionNames = [version0, version1];
var benchOptions = {defer: true, minSamples: 1, maxTime: +args.l};
var versions;
var wins = {};
var totalTime = {};
Expand Down Expand Up @@ -120,16 +123,30 @@ function doesNotMatch(suiteConfig) {
function createSuite(suiteConfig) {
var suite = new Benchmark.Suite();
var args = suiteConfig.args;
var errored = false;

function addBench(version, versionName) {
var name = suiteConfig.name + " " + versionName;

try {
suiteConfig.setup(1);
suiteConfig.fn(version, function () {});
} catch (e) {
console.error(name + " Errored");
errored = true;
return;
}

suite.add(name, function (deferred) {
suiteConfig.fn(version, function () {
deferred.resolve();
});
}, _.extend({
versionName: versionName,
setup: _.partial.apply(null, [suiteConfig.setup].concat(args))
setup: _.partial.apply(null, [suiteConfig.setup].concat(args)),
onError: function (err) {
console.log(err.stack);
}
}, benchOptions));
}

Expand All @@ -139,18 +156,22 @@ function createSuite(suiteConfig) {

return suite.on('cycle', function(event) {
var mean = event.target.stats.mean * 1000;
console.log(event.target + ", " + (+mean.toPrecision(2)) + "ms per run");
console.log(event.target + ", " + (+mean.toPrecision(3)) + "ms per run");
var version = event.target.options.versionName;
if (errored) return;
totalTime[version] += mean;
})
.on('error', function (err) { console.error(err); })
.on('complete', function() {
var fastest = this.filter('fastest');
if (fastest.length === 2) {
console.log("Tie");
} else {
var winner = fastest[0].options.versionName;
console.log(winner + ' is faster');
wins[winner]++;
if (!errored) {
var fastest = this.filter('fastest');
if (fastest.length === 2) {
console.log("Tie");
} else {
var winner = fastest[0].options.versionName;
console.log(winner + ' is faster');
wins[winner]++;
}
}
console.log("--------------------------------------");
});
Expand Down
24 changes: 24 additions & 0 deletions perf/suites.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ module.exports = [
fn: function (async, done) {
setTimeout(done, 0);
}
},
{
name: "ensureAsync sync",
fn: function (async, done) {
async.ensureAsync(function (cb) {
cb();
})(done);
}
},
{
name: "ensureAsync async",
fn: function (async, done) {
async.ensureAsync(function (cb) {
setImmediate(cb);
})(done);
}
},
{
name: "ensureAsync async noWrap",
fn: function (async, done) {
(function (cb) {
setImmediate(cb);
}(done));
}
}
];

86 changes: 80 additions & 6 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ function getFunctionsObject(call_order) {
};
}

exports['forever'] = function (test) {
test.expect(1);
exports['forever'] = {

'async': function (test) {
test.expect(2);
var counter = 0;
function addOne(callback) {
counter++;
Expand All @@ -94,8 +96,28 @@ exports['forever'] = function (test) {
}
async.forever(addOne, function (err) {
test.equal(err, 'too big!');
test.equal(counter, 50);
test.done();
});
},

'sync': function (test) {
test.expect(2);
var counter = 0;
function addOne(callback) {
counter++;
if (counter === 50000) {
return callback('too big!');
}
callback();
}
async.forever(addOne, function (err) {
test.equal(err, 'too big!');
test.equal(counter, 50000);
test.done();
});
}

};

exports['applyEach'] = function (test) {
Expand Down Expand Up @@ -1030,12 +1052,12 @@ exports['parallel does not continue replenishing after error'] = function (test)
}
setTimeout(function(){
callback();
}, delay);
}, delay);
}

async.parallelLimit(arr, limit, function(x, callback) {

}, function(err){});
}, function(err){});

setTimeout(function(){
test.equal(started, 3);
Expand Down Expand Up @@ -1438,7 +1460,7 @@ exports['eachLimit does not continue replenishing after error'] = function (test
setTimeout(function(){
callback();
}, delay);
}, function(err){});
}, function(err){});

setTimeout(function(){
test.equal(started, 3);
Expand Down Expand Up @@ -1743,7 +1765,7 @@ exports['mapLimit does not continue replenishing after error'] = function (test)
setTimeout(function(){
callback();
}, delay);
}, function(err){});
}, function(err){});

setTimeout(function(){
test.equal(started, 3);
Expand Down Expand Up @@ -3561,3 +3583,55 @@ exports['queue started'] = function(test) {

};

exports['ensureAsync'] = {
'defer sync functions': function (test) {
var sync = true;
async.ensureAsync(function (arg1, arg2, cb) {
test.equal(arg1, 1);
test.equal(arg2, 2);
cb(null, 4, 5);
})(1, 2, function (err, arg4, arg5) {
test.equal(err, null);
test.equal(arg4, 4);
test.equal(arg5, 5);
test.ok(!sync, 'callback called on same tick');
test.done();
});
sync = false;
},

'do not defer async functions': function (test) {
var sync = false;
async.ensureAsync(function (arg1, arg2, cb) {
test.equal(arg1, 1);
test.equal(arg2, 2);
async.setImmediate(function () {
sync = true;
cb(null, 4, 5);
sync = false;
});
})(1, 2, function (err, arg4, arg5) {
test.equal(err, null);
test.equal(arg4, 4);
test.equal(arg5, 5);
test.ok(sync, 'callback called on next tick');
test.done();
});
},

'double wrapping': function (test) {
var sync = true;
async.ensureAsync(async.ensureAsync(function (arg1, arg2, cb) {
test.equal(arg1, 1);
test.equal(arg2, 2);
cb(null, 4, 5);
}))(1, 2, function (err, arg4, arg5) {
test.equal(err, null);
test.equal(arg4, 4);
test.equal(arg5, 5);
test.ok(!sync, 'callback called on same tick');
test.done();
});
sync = false;
}
};

0 comments on commit 1b258fb

Please sign in to comment.