Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: make sure write starts only after read completes #1

Merged
merged 1 commit into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,43 @@ const createMutex = (name, options) => {

return {
readLock: (fn) => {
if (!readQueue) {
readQueue = new Queue({
concurrency: options.concurrency,
autoStart: false
})

const localReadQueue = readQueue

masterQueue.add(() => {
localReadQueue.start()

return localReadQueue.onIdle()
.then(() => {
if (readQueue === localReadQueue) {
readQueue = null
}
})
})
// If there's already a read queue, just add the task to it
if (readQueue) {
return readQueue.add(() => timeout(fn(), options.timeout))
}

return readQueue.add(() => timeout(fn(), options.timeout))
// Create a new read queue
readQueue = new Queue({
concurrency: options.concurrency,
autoStart: false
})
const localReadQueue = readQueue

// Add the task to the read queue
const readPromise = readQueue.add(() => timeout(fn(), options.timeout))

masterQueue.add(() => {
// Start the task only once the master queue has completed processing
// any previous tasks
localReadQueue.start()

// Once all the tasks in the read queue have completed, remove it so
// that the next read lock will occur after any write locks that were
// started in the interim
return localReadQueue.onIdle()
.then(() => {
if (readQueue === localReadQueue) {
readQueue = null
}
})
})

return readPromise
},
writeLock: (fn) => {
// Remove the read queue reference, so that any later read locks will be
// added to a new queue that starts after this write lock has been
// released
readQueue = null

return masterQueue.add(() => timeout(fn(), options.timeout))
Expand Down
19 changes: 19 additions & 0 deletions test/fixtures/process-error-handling.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const mortice = require('../../')

const mutex = mortice()

mutex.readLock(() => {
return new Promise((resolve, reject) => {
console.info('read 1')

reject(new Error('err'))
})
})

mutex.writeLock(() => {
return new Promise((resolve, reject) => {
console.info('write 1')

resolve()
})
})
22 changes: 22 additions & 0 deletions test/fixtures/process-read-then-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const mortice = require('../../')

const mutex = mortice()

mutex.readLock(() => {
return new Promise((resolve, reject) => {
console.info('read 1')

setTimeout(() => {
console.info('read 1 complete')
resolve()
}, 500)
})
})

mutex.writeLock(() => {
return new Promise((resolve, reject) => {
console.info('write 1')

resolve()
})
})
17 changes: 17 additions & 0 deletions test/process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,20 @@ write 2
read 4`)
})
})

test('executes read then waits to start write', (t) => {
return exec('node', [path.join(__dirname, 'fixtures', 'process-read-then-write.js')])
.then(result => {
t.is(result.stdout, `read 1
read 1 complete
write 1`)
})
})

test('continues processing after error', (t) => {
return exec('node', [path.join(__dirname, 'fixtures', 'process-error-handling.js')])
.then(result => {
t.is(result.stdout, `read 1
write 1`)
})
})