diff --git a/lib/async.js b/lib/async.js index 888934402..5c178197b 100644 --- a/lib/async.js +++ b/lib/async.js @@ -929,24 +929,23 @@ _insert(q, data, true, callback); }, process: function () { - if (!q.paused && workers < q.concurrency && q.tasks.length) { - while(workers < q.concurrency && q.tasks.length){ - var tasks = q.payload ? - q.tasks.splice(0, q.payload) : - q.tasks.splice(0, q.tasks.length); - - var data = _map(tasks, function (task) { - return task.data; - }); + while(!q.paused && workers < q.concurrency && q.tasks.length){ - if (q.tasks.length === 0) { - q.empty(); - } - workers += 1; - workersList.push(tasks[0]); - var cb = only_once(_next(q, tasks)); - worker(data, cb); + var tasks = q.payload ? + q.tasks.splice(0, q.payload) : + q.tasks.splice(0, q.tasks.length); + + var data = _map(tasks, function (task) { + return task.data; + }); + + if (q.tasks.length === 0) { + q.empty(); } + workers += 1; + workersList.push(tasks[0]); + var cb = only_once(_next(q, tasks)); + worker(data, cb); } }, length: function () { diff --git a/test/test-async.js b/test/test-async.js index ac72fedf6..5777744fc 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -3497,6 +3497,36 @@ exports['queue'] = { }, 800); }, + 'pause in worker with concurrency': function(test) { + test.expect(1); + var call_order = []; + var q = async.queue(function (task, callback) { + if (task.isLongRunning) { + q.pause(); + setTimeout(function () { + call_order.push(task.id); + q.resume(); + callback(); + }, 500); + } + else { + call_order.push(task.id); + callback(); + } + }, 10); + + q.push({ id: 1, isLongRunning: true}); + q.push({ id: 2 }); + q.push({ id: 3 }); + q.push({ id: 4 }); + q.push({ id: 5 }); + + setTimeout(function () { + test.same(call_order, [1, 2, 3, 4, 5]); + test.done(); + }, 1000); + }, + 'pause with concurrency': function(test) { test.expect(4); var call_order = [],