From baf62a0a9b796e2ce5ee9c39361ef7c22ac9a03a Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 28 Oct 2015 21:49:35 +0100 Subject: [PATCH 1/3] Bug fix #945: pause in queue with concurrency doesn't pause --- lib/async.js | 4 ++++ test/test-async.js | 31 +++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/lib/async.js b/lib/async.js index 888934402..700f30767 100644 --- a/lib/async.js +++ b/lib/async.js @@ -931,6 +931,10 @@ process: function () { if (!q.paused && workers < q.concurrency && q.tasks.length) { while(workers < q.concurrency && q.tasks.length){ + if (q.paused) { + return; + } + var tasks = q.payload ? q.tasks.splice(0, q.payload) : q.tasks.splice(0, q.tasks.length); diff --git a/test/test-async.js b/test/test-async.js index ac72fedf6..a63a8bbca 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -3497,6 +3497,37 @@ exports['queue'] = { }, 800); }, + 'pause in worker with concurrency': function(test) { + test.expect(1); + var call_order = []; + var q = async.queue(function (task, callback) { + if (task.isLongRunning) { + q.pause(); + setTimeout(function () { + call_order.push(task.id); + q.resume(); + callback(); + }, 500); + } + else { + call_order.push(task.id); + // call_order.push('timeout ' + elapsed()); + callback(); + } + }, 10); + + q.push({ id: 1, isLongRunning: true}); + q.push({ id: 2 }); + q.push({ id: 3 }); + q.push({ id: 4 }); + q.push({ id: 5 }); + + setTimeout(function () { + test.same(call_order, [1, 2, 3, 4, 5]); + test.done(); + }, 1000); + }, + 'pause with concurrency': function(test) { test.expect(4); var call_order = [], From 19f0d0ff3283fecd6a09c64c7858ab6720fea57a Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 28 Oct 2015 21:52:18 +0100 Subject: [PATCH 2/3] Bug fix #945: remove commented code --- test/test-async.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test-async.js b/test/test-async.js index a63a8bbca..5777744fc 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -3511,7 +3511,6 @@ exports['queue'] = { } else { call_order.push(task.id); - // call_order.push('timeout ' + elapsed()); callback(); } }, 10); From 259c26c7668d93b2346cacdeeb8b023c5af0901f Mon Sep 17 00:00:00 2001 From: Adam Date: Thu, 29 Oct 2015 00:58:32 +0100 Subject: [PATCH 3/3] Clean up --- lib/async.js | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/lib/async.js b/lib/async.js index 700f30767..5c178197b 100644 --- a/lib/async.js +++ b/lib/async.js @@ -929,28 +929,23 @@ _insert(q, data, true, callback); }, process: function () { - if (!q.paused && workers < q.concurrency && q.tasks.length) { - while(workers < q.concurrency && q.tasks.length){ - if (q.paused) { - return; - } + while(!q.paused && workers < q.concurrency && q.tasks.length){ - var tasks = q.payload ? - q.tasks.splice(0, q.payload) : - q.tasks.splice(0, q.tasks.length); + var tasks = q.payload ? + q.tasks.splice(0, q.payload) : + q.tasks.splice(0, q.tasks.length); - var data = _map(tasks, function (task) { - return task.data; - }); + var data = _map(tasks, function (task) { + return task.data; + }); - if (q.tasks.length === 0) { - q.empty(); - } - workers += 1; - workersList.push(tasks[0]); - var cb = only_once(_next(q, tasks)); - worker(data, cb); + if (q.tasks.length === 0) { + q.empty(); } + workers += 1; + workersList.push(tasks[0]); + var cb = only_once(_next(q, tasks)); + worker(data, cb); } }, length: function () {