Skip to content

Commit

Permalink
Fix the infinite loop created by setImmediate() in threads example (#564
Browse files Browse the repository at this point in the history
)

* fix the infinite loop created by setImmediate() in threads example

* add handling for dead workers
  • Loading branch information
m4heshd authored Mar 30, 2021
1 parent 351411d commit 5493d14
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions docs/threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,37 @@ exports.asyncQuery = (sql, ...parameters) => {
reject,
message: { sql, parameters },
});
callWorkers();
});
};

/*
Call poll() in every worker upon new asyncQuery request.
*/

let workers = [];
function callWorkers() {
workers.forEach((worker) => {
worker.poll();
})
}

/*
Spawn workers that try to drain the queue.
*/

os.cpus().forEach(function spawn() {
const worker = new Worker('./worker.js');
const threadId = worker.threadId;

let job = null; // Current item from the queue
let error = null; // Error that caused the worker to crash
let timer = null; // Timer used for polling

function poll() {
if (queue.length) {
if (!job && queue.length) {
// If there's a job in the queue, send it to the worker
job = queue.shift();
worker.postMessage(job.message);
} else {
// Otherwise, check again later
timer = setImmediate(poll);
}
}

Expand All @@ -73,7 +82,7 @@ os.cpus().forEach(function spawn() {
error = err;
})
.on('exit', (code) => {
clearImmediate(timer);
workers = workers.filter(w => w.threadId !== threadId);
if (job) {
job.reject(error || new Error('worker died'));
}
Expand All @@ -82,5 +91,7 @@ os.cpus().forEach(function spawn() {
spawn(); // Worker died, so spawn a new one
}
});

workers.push({threadId, poll});
});
```

0 comments on commit 5493d14

Please sign in to comment.