Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensureAsync #769

Merged
merged 4 commits into from
May 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Usage:

* [`memoize`](#memoize)
* [`unmemoize`](#unmemoize)
* [`ensureAsync`](#ensureAsync)
* [`log`](#log)
* [`dir`](#dir)
* [`noConflict`](#noConflict)
Expand Down Expand Up @@ -1657,6 +1658,41 @@ __Arguments__

* `fn` - the memoized function

---------------------------------------

<a name="ensureAsync" />
### ensureAsync(fn)

Wrap an async function and ensure it calls its callback on a later tick of the event loop. If the function already calls its callback on a next tick, no extra deferral is added. This is useful for preventing stack overflows (`RangeError: Maximum call stack size exceeded`) and generally keeping [Zalgo](http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony) contained.

__Arguments__

* `fn` - an async function, one that expects a node-style callback as its last argument

Returns a wrapped function with the exact same call signature as the function passed in.

__Example__

```js
function sometimesAsync(arg, callback) {
if (cache[arg]) {
return callback(null, cache[arg]); // this would be synchronous!!
} else {
doSomeIO(arg, callback); // this IO would be asynchronous
}
}

// this has a risk of stack overflows if many results are cached in a row
async.mapSeries(args, sometimesAsync, done);

// this will defer sometimesAsync's callback if necessary,
// preventing stack overflows
async.mapSeries(args, async.ensureAsync(sometimesAsync), done);

```

---------------------------------------

<a name="log" />
### log(function, arguments)

Expand Down
31 changes: 26 additions & 5 deletions lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -1255,18 +1255,39 @@
async.applyEachSeries = doSeries(_applyEach);

async.forever = function (fn, callback) {
var done = only_once(callback || noop);
var task = ensureAsync(fn);
function next(err) {
if (err) {
if (callback) {
return callback(err);
}
throw err;
return done(err);
}
fn(next);
task(next);
}
next();
};

function ensureAsync(fn) {
return function (/*...args, callback*/) {
var args = _baseSlice(arguments);
var callback = args.pop();
args.push(function () {
var innerArgs = arguments;
if (sync) {
async.setImmediate(function () {
callback.apply(null, innerArgs);
});
} else {
callback.apply(null, innerArgs);
}
});
var sync = true;
fn.apply(this, args);
sync = false;
};
}

async.ensureAsync = ensureAsync;

// Node.js
if (typeof module !== 'undefined' && module.exports) {
module.exports = async;
Expand Down
43 changes: 32 additions & 11 deletions perf/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var _ = require("lodash");
var Benchmark = require("benchmark");
var benchOptions = {defer: true, minSamples: 1, maxTime: 2};
var exec = require("child_process").exec;
var fs = require("fs");
var path = require("path");
Expand All @@ -16,8 +15,11 @@ var args = require("yargs")
.alias("g", "grep")
.default("g", ".*")
.describe("i", "skip benchmarks whose names match this regex")
.alias("g", "reject")
.alias("i", "reject")
.default("i", "^$")
.describe("l", "maximum running time per test (in seconds)")
.alias("l", "limit")
.default("l", 2)
.help('h')
.alias('h', 'help')
.example('$0 0.9.2 0.9.0', 'Compare v0.9.2 with v0.9.0')
Expand All @@ -33,6 +35,7 @@ var reject = new RegExp(args.i, "i");
var version0 = args._[0] || require("../package.json").version;
var version1 = args._[1] || "current";
var versionNames = [version0, version1];
var benchOptions = {defer: true, minSamples: 1, maxTime: +args.l};
var versions;
var wins = {};
var totalTime = {};
Expand Down Expand Up @@ -120,16 +123,30 @@ function doesNotMatch(suiteConfig) {
function createSuite(suiteConfig) {
var suite = new Benchmark.Suite();
var args = suiteConfig.args;
var errored = false;

function addBench(version, versionName) {
var name = suiteConfig.name + " " + versionName;

try {
suiteConfig.setup(1);
suiteConfig.fn(version, function () {});
} catch (e) {
console.error(name + " Errored");
errored = true;
return;
}

suite.add(name, function (deferred) {
suiteConfig.fn(version, function () {
deferred.resolve();
});
}, _.extend({
versionName: versionName,
setup: _.partial.apply(null, [suiteConfig.setup].concat(args))
setup: _.partial.apply(null, [suiteConfig.setup].concat(args)),
onError: function (err) {
console.log(err.stack);
}
}, benchOptions));
}

Expand All @@ -139,18 +156,22 @@ function createSuite(suiteConfig) {

return suite.on('cycle', function(event) {
var mean = event.target.stats.mean * 1000;
console.log(event.target + ", " + (+mean.toPrecision(2)) + "ms per run");
console.log(event.target + ", " + (+mean.toPrecision(3)) + "ms per run");
var version = event.target.options.versionName;
if (errored) return;
totalTime[version] += mean;
})
.on('error', function (err) { console.error(err); })
.on('complete', function() {
var fastest = this.filter('fastest');
if (fastest.length === 2) {
console.log("Tie");
} else {
var winner = fastest[0].options.versionName;
console.log(winner + ' is faster');
wins[winner]++;
if (!errored) {
var fastest = this.filter('fastest');
if (fastest.length === 2) {
console.log("Tie");
} else {
var winner = fastest[0].options.versionName;
console.log(winner + ' is faster');
wins[winner]++;
}
}
console.log("--------------------------------------");
});
Expand Down
24 changes: 24 additions & 0 deletions perf/suites.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ module.exports = [
fn: function (async, done) {
setTimeout(done, 0);
}
},
{
name: "ensureAsync sync",
fn: function (async, done) {
async.ensureAsync(function (cb) {
cb();
})(done);
}
},
{
name: "ensureAsync async",
fn: function (async, done) {
async.ensureAsync(function (cb) {
setImmediate(cb);
})(done);
}
},
{
name: "ensureAsync async noWrap",
fn: function (async, done) {
(function (cb) {
setImmediate(cb);
}(done));
}
}
];

86 changes: 80 additions & 6 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ function getFunctionsObject(call_order) {
};
}

exports['forever'] = function (test) {
test.expect(1);
exports['forever'] = {

'async': function (test) {
test.expect(2);
var counter = 0;
function addOne(callback) {
counter++;
Expand All @@ -94,8 +96,28 @@ exports['forever'] = function (test) {
}
async.forever(addOne, function (err) {
test.equal(err, 'too big!');
test.equal(counter, 50);
test.done();
});
},

'sync': function (test) {
test.expect(2);
var counter = 0;
function addOne(callback) {
counter++;
if (counter === 50000) {
return callback('too big!');
}
callback();
}
async.forever(addOne, function (err) {
test.equal(err, 'too big!');
test.equal(counter, 50000);
test.done();
});
}

};

exports['applyEach'] = function (test) {
Expand Down Expand Up @@ -1030,12 +1052,12 @@ exports['parallel does not continue replenishing after error'] = function (test)
}
setTimeout(function(){
callback();
}, delay);
}, delay);
}

async.parallelLimit(arr, limit, function(x, callback) {

}, function(err){});
}, function(err){});

setTimeout(function(){
test.equal(started, 3);
Expand Down Expand Up @@ -1438,7 +1460,7 @@ exports['eachLimit does not continue replenishing after error'] = function (test
setTimeout(function(){
callback();
}, delay);
}, function(err){});
}, function(err){});

setTimeout(function(){
test.equal(started, 3);
Expand Down Expand Up @@ -1743,7 +1765,7 @@ exports['mapLimit does not continue replenishing after error'] = function (test)
setTimeout(function(){
callback();
}, delay);
}, function(err){});
}, function(err){});

setTimeout(function(){
test.equal(started, 3);
Expand Down Expand Up @@ -3561,3 +3583,55 @@ exports['queue started'] = function(test) {

};

exports['ensureAsync'] = {
'defer sync functions': function (test) {
var sync = true;
async.ensureAsync(function (arg1, arg2, cb) {
test.equal(arg1, 1);
test.equal(arg2, 2);
cb(null, 4, 5);
})(1, 2, function (err, arg4, arg5) {
test.equal(err, null);
test.equal(arg4, 4);
test.equal(arg5, 5);
test.ok(!sync, 'callback called on same tick');
test.done();
});
sync = false;
},

'do not defer async functions': function (test) {
var sync = false;
async.ensureAsync(function (arg1, arg2, cb) {
test.equal(arg1, 1);
test.equal(arg2, 2);
async.setImmediate(function () {
sync = true;
cb(null, 4, 5);
sync = false;
});
})(1, 2, function (err, arg4, arg5) {
test.equal(err, null);
test.equal(arg4, 4);
test.equal(arg5, 5);
test.ok(sync, 'callback called on next tick');
test.done();
});
},

'double wrapping': function (test) {
var sync = true;
async.ensureAsync(async.ensureAsync(function (arg1, arg2, cb) {
test.equal(arg1, 1);
test.equal(arg2, 2);
cb(null, 4, 5);
}))(1, 2, function (err, arg4, arg5) {
test.equal(err, null);
test.equal(arg4, 4);
test.equal(arg5, 5);
test.ok(!sync, 'callback called on same tick');
test.done();
});
sync = false;
}
};