diff --git a/lib/index.js b/lib/index.js index 04d6242..5075f14 100644 --- a/lib/index.js +++ b/lib/index.js @@ -20,29 +20,43 @@ const createMutex = (name, options) => { return { readLock: (fn) => { - if (!readQueue) { - readQueue = new Queue({ - concurrency: options.concurrency, - autoStart: false - }) - - const localReadQueue = readQueue - - masterQueue.add(() => { - localReadQueue.start() - - return localReadQueue.onIdle() - .then(() => { - if (readQueue === localReadQueue) { - readQueue = null - } - }) - }) + // If there's already a read queue, just add the task to it + if (readQueue) { + return readQueue.add(() => timeout(fn(), options.timeout)) } - return readQueue.add(() => timeout(fn(), options.timeout)) + // Create a new read queue + readQueue = new Queue({ + concurrency: options.concurrency, + autoStart: false + }) + const localReadQueue = readQueue + + // Add the task to the read queue + const readPromise = readQueue.add(() => timeout(fn(), options.timeout)) + + masterQueue.add(() => { + // Start the task only once the master queue has completed processing + // any previous tasks + localReadQueue.start() + + // Once all the tasks in the read queue have completed, remove it so + // that the next read lock will occur after any write locks that were + // started in the interim + return localReadQueue.onIdle() + .then(() => { + if (readQueue === localReadQueue) { + readQueue = null + } + }) + }) + + return readPromise }, writeLock: (fn) => { + // Remove the read queue reference, so that any later read locks will be + // added to a new queue that starts after this write lock has been + // released readQueue = null return masterQueue.add(() => timeout(fn(), options.timeout)) diff --git a/test/fixtures/process-error-handling.js b/test/fixtures/process-error-handling.js new file mode 100644 index 0000000..53ce7f0 --- /dev/null +++ b/test/fixtures/process-error-handling.js @@ -0,0 +1,19 @@ +const mortice = require('../../') + +const mutex = mortice() + +mutex.readLock(() => { + return new Promise((resolve, reject) => { + console.info('read 1') + + reject(new Error('err')) + }) +}) + +mutex.writeLock(() => { + return new Promise((resolve, reject) => { + console.info('write 1') + + resolve() + }) +}) diff --git a/test/fixtures/process-read-then-write.js b/test/fixtures/process-read-then-write.js new file mode 100644 index 0000000..a895233 --- /dev/null +++ b/test/fixtures/process-read-then-write.js @@ -0,0 +1,22 @@ +const mortice = require('../../') + +const mutex = mortice() + +mutex.readLock(() => { + return new Promise((resolve, reject) => { + console.info('read 1') + + setTimeout(() => { + console.info('read 1 complete') + resolve() + }, 500) + }) +}) + +mutex.writeLock(() => { + return new Promise((resolve, reject) => { + console.info('write 1') + + resolve() + }) +}) diff --git a/test/process.test.js b/test/process.test.js index b2216d6..3c4e1a0 100644 --- a/test/process.test.js +++ b/test/process.test.js @@ -13,3 +13,20 @@ write 2 read 4`) }) }) + +test('executes read then waits to start write', (t) => { + return exec('node', [path.join(__dirname, 'fixtures', 'process-read-then-write.js')]) + .then(result => { + t.is(result.stdout, `read 1 +read 1 complete +write 1`) + }) +}) + +test('continues processing after error', (t) => { + return exec('node', [path.join(__dirname, 'fixtures', 'process-error-handling.js')]) + .then(result => { + t.is(result.stdout, `read 1 +write 1`) + }) +})