Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: race condition causing Database is not open error
Browse files Browse the repository at this point in the history
[`cleanup`](https://github.com/ipfs/js-ipfs/blob/7d9b006d0b0542651cbaa540d5f22a0112ae09bd/src/cli/bin.js#L109) closes the DB but `yargs-promise` does not wait for async stuff in the command handler to finish, and executes that promise chain immediately after the handler is executed. So it's a race condition. In windows, _sometimes_, the database is closed before the async stuff from the handler completes.

This PR changes the CLI command handlers to always pass a promise to `resolve` function that `yargs-promise` adds to the context (`argv`). This makes `yargs-promise` wait for it to be resolved before continuing the promise chain and closing the DB.

Since I had to edit all of the commands to make them use the `resolve` function and introduce promises, I decided to take the opportunity to refactor the CLI commands to use async/await. It should help towards #1670.

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
alanshaw committed Jan 21, 2019
1 parent 00cb494 commit f34df6f
Show file tree
Hide file tree
Showing 67 changed files with 563 additions and 742 deletions.
6 changes: 5 additions & 1 deletion src/cli/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ if (args[0] === 'daemon' || args[0] === 'init') {
.completion()
.command(require('./commands/daemon'))
.command(require('./commands/init'))
.parse(args)

new YargsPromise(cli).parse(args)
.then(({ data }) => {
if (data) print(data)
})
} else {
// here we have to make a separate yargs instance with
// only the `api` option because we need this before doing
Expand Down
129 changes: 66 additions & 63 deletions src/cli/commands/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,56 @@

const sortBy = require('lodash/sortBy')
const pull = require('pull-stream')
const getFolderSize = require('get-folder-size')
const promisify = require('promisify-es6')
const getFolderSize = promisify(require('get-folder-size'))
const byteman = require('byteman')
const reduce = require('async/reduce')
const mh = require('multihashes')
const multibase = require('multibase')
const toPull = require('stream-to-pull-stream')
const { print, isDaemonOn, createProgressBar } = require('../utils')
const { cidToString } = require('../../utils/cid')
const globSource = require('../../utils/files/glob-source')

function getTotalBytes (paths, cb) {
reduce(paths, 0, (total, path, cb) => {
getFolderSize(path, (err, size) => {
if (err) return cb(err)
cb(null, total + size)
})
}, cb)
async function getTotalBytes (paths, cb) {
const sizes = await Promise.all(paths.map(p => getFolderSize(p)))
return sizes.reduce((total, size) => total + size, 0)
}

function addPipeline (source, addStream, options) {
pull(
source,
addStream,
pull.collect((err, added) => {
if (err) {
// Tweak the error message and add more relevant infor for the CLI
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
return new Promise((resolve, reject) => {
pull(
source,
addStream,
pull.collect((err, added) => {
if (err) {
// Tweak the error message and add more relevant infor for the CLI
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
}
return reject(err)
}

if (options.silent) return resolve()

if (options.quieter) {
print(added.pop().hash)
return resolve()
}
throw err
}

if (options.silent) return
if (options.quieter) return print(added.pop().hash)

sortBy(added, 'path')
.reverse()
.map((file) => {
const log = options.quiet ? [] : ['added']
log.push(cidToString(file.hash, { base: options.cidBase }))
if (!options.quiet && file.path.length > 0) log.push(file.path)
return log.join(' ')
})
.forEach((msg) => print(msg))
})
)
sortBy(added, 'path')
.reverse()
.map((file) => {
const log = options.quiet ? [] : ['added']
log.push(cidToString(file.hash, { base: options.cidBase }))
if (!options.quiet && file.path.length > 0) log.push(file.path)
return log.join(' ')
})
.forEach((msg) => print(msg))

resolve()
})
)
})
}

module.exports = {
Expand Down Expand Up @@ -140,46 +144,45 @@ module.exports = {
},

handler (argv) {
const { ipfs } = argv
const options = {
strategy: argv.trickle ? 'trickle' : 'balanced',
shardSplitThreshold: argv.enableShardingExperiment
? argv.shardSplitThreshold
: Infinity,
cidVersion: argv.cidVersion,
rawLeaves: argv.rawLeaves,
onlyHash: argv.onlyHash,
hashAlg: argv.hash,
wrapWithDirectory: argv.wrapWithDirectory,
pin: argv.pin,
chunker: argv.chunker
}

if (options.enableShardingExperiment && isDaemonOn()) {
throw new Error('Error: Enabling the sharding experiment should be done on the daemon')
}
argv.resolve((async () => {
const { ipfs } = argv
const options = {
strategy: argv.trickle ? 'trickle' : 'balanced',
shardSplitThreshold: argv.enableShardingExperiment
? argv.shardSplitThreshold
: Infinity,
cidVersion: argv.cidVersion,
rawLeaves: argv.rawLeaves,
onlyHash: argv.onlyHash,
hashAlg: argv.hash,
wrapWithDirectory: argv.wrapWithDirectory,
pin: argv.pin,
chunker: argv.chunker
}

const source = argv.file
? globSource(...argv.file, { recursive: argv.recursive })
: toPull.source(process.stdin) // Pipe directly to ipfs.add
if (options.enableShardingExperiment && isDaemonOn()) {
throw new Error('Error: Enabling the sharding experiment should be done on the daemon')
}

const adder = ipfs.addPullStream(options)
const source = argv.file
? globSource(...argv.file, { recursive: argv.recursive })
: toPull.source(process.stdin) // Pipe directly to ipfs.add

// No progress or piping directly to ipfs.add: no need to getTotalBytes
if (!argv.progress || !argv.file) {
return addPipeline(source, adder, argv)
}
const adder = ipfs.addPullStream(options)

getTotalBytes(argv.file, (err, totalBytes) => {
if (err) throw err
// No progress or piping directly to ipfs.add: no need to getTotalBytes
if (!argv.progress || !argv.file) {
return addPipeline(source, adder, argv)
}

const totalBytes = await getTotalBytes(argv.file)
const bar = createProgressBar(totalBytes)

options.progress = byteLength => {
bar.update(byteLength / totalBytes, { progress: byteman(byteLength, 2, 'MB') })
}

addPipeline(source, adder, argv)
})
return addPipeline(source, adder, argv)
})())
}
}
11 changes: 4 additions & 7 deletions src/cli/commands/bitswap/stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ module.exports = {
}
},

handler ({ ipfs, cidBase }) {
ipfs.bitswap.stat((err, stats) => {
if (err) {
throw err
}

handler ({ ipfs, cidBase, resolve }) {
resolve((async () => {
const stats = await ipfs.bitswap.stat()
stats.wantlist = stats.wantlist.map(k => cidToString(k['/'], { base: cidBase, upgrade: false }))
stats.peers = stats.peers || []

Expand All @@ -34,6 +31,6 @@ module.exports = {
${stats.wantlist.join('\n ')}
partners [${stats.peers.length}]
${stats.peers.join('\n ')}`)
})
})())
}
}
10 changes: 4 additions & 6 deletions src/cli/commands/bitswap/unwant.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ module.exports = {
choices: multibase.names
}
},
handler ({ ipfs, key, cidBase }) {
ipfs.bitswap.unwant(key, (err) => {
if (err) {
throw err
}
handler ({ ipfs, key, cidBase, resolve }) {
resolve((async () => {
await ipfs.bitswap.unwant(key)
print(`Key ${cidToString(key, { base: cidBase, upgrade: false })} removed from wantlist`)
})
})())
}
}
10 changes: 4 additions & 6 deletions src/cli/commands/bitswap/wantlist.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ module.exports = {
}
},

handler ({ ipfs, peer, cidBase }) {
ipfs.bitswap.wantlist(peer, (err, list) => {
if (err) {
throw err
}
handler ({ ipfs, peer, cidBase, resolve }) {
resolve((async () => {
const list = await ipfs.bitswap.wantlist(peer)
list.Keys.forEach(k => print(cidToString(k['/'], { base: cidBase, upgrade: false })))
})
})())
}
}
11 changes: 4 additions & 7 deletions src/cli/commands/block/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ module.exports = {

builder: {},

handler ({ ipfs, key }) {
ipfs.block.get(key, (err, block) => {
if (err) {
throw err
}

handler ({ ipfs, key, resolve }) {
resolve((async () => {
const block = await ipfs.block.get(key)
if (block) {
print(block.data, false)
} else {
print('Block was unwanted before it could be remotely retrieved')
}
})
})())
}
}
37 changes: 16 additions & 21 deletions src/cli/commands/block/put.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,10 @@
const bl = require('bl')
const fs = require('fs')
const multibase = require('multibase')
const promisify = require('promisify-es6')
const { print } = require('../../utils')
const { cidToString } = require('../../../utils/cid')

function addBlock (data, opts) {
const ipfs = opts.ipfs

ipfs.block.put(data, opts, (err, block) => {
if (err) {
throw err
}
print(cidToString(block.cid, { base: opts.cidBase }))
})
}

module.exports = {
command: 'put [block]',

Expand Down Expand Up @@ -48,17 +38,22 @@ module.exports = {
},

handler (argv) {
if (argv.block) {
const buf = fs.readFileSync(argv.block)
return addBlock(buf, argv)
}

process.stdin.pipe(bl((err, input) => {
if (err) {
throw err
argv.resolve((async () => {
let data

if (argv.block) {
data = await promisify(fs.readFile)(argv.block)
} else {
data = await new Promise((resolve, reject) => {
process.stdin.pipe(bl((err, input) => {
if (err) return reject(err)
resolve(input)
}))
})
}

addBlock(input, argv)
}))
const { cid } = await argv.ipfs.block.put(data, argv)
print(cidToString(cid, { base: argv.cidBase }))
})())
}
}
17 changes: 7 additions & 10 deletions src/cli/commands/block/rm.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,15 @@ module.exports = {

builder: {},

handler ({ ipfs, key }) {
if (isDaemonOn()) {
// TODO implement this once `js-ipfs-http-client` supports it
throw new Error('rm block with daemon running is not yet implemented')
}

ipfs.block.rm(key, (err) => {
if (err) {
throw err
handler ({ ipfs, key, resolve }) {
resolve((async () => {
if (isDaemonOn()) {
// TODO implement this once `js-ipfs-http-client` supports it
throw new Error('rm block with daemon running is not yet implemented')
}

await ipfs.block.rm(key)
print('removed ' + key)
})
})())
}
}
11 changes: 4 additions & 7 deletions src/cli/commands/block/stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ module.exports = {
}
},

handler ({ ipfs, key, cidBase }) {
ipfs.block.stat(key, (err, stats) => {
if (err) {
throw err
}

handler ({ ipfs, key, cidBase, resolve }) {
resolve((async () => {
const stats = await ipfs.block.stat(key)
print('Key: ' + cidToString(stats.key, { base: cidBase }))
print('Size: ' + stats.size)
})
})())
}
}
11 changes: 3 additions & 8 deletions src/cli/commands/bootstrap/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@ module.exports = {
},

handler (argv) {
argv.ipfs.bootstrap.add(argv.peer, {
default: argv.default
}, (err, list) => {
if (err) {
throw err
}

argv.resolve((async () => {
const list = await argv.ipfs.bootstrap.add(argv.peer, { default: argv.default })
list.Peers.forEach((peer) => print(peer))
})
})())
}
}
9 changes: 3 additions & 6 deletions src/cli/commands/bootstrap/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ module.exports = {
builder: {},

handler (argv) {
argv.ipfs.bootstrap.list((err, list) => {
if (err) {
throw err
}

argv.resolve((async () => {
const list = await argv.ipfs.bootstrap.list()
list.Peers.forEach((node) => print(node))
})
})())
}
}
Loading

0 comments on commit f34df6f

Please sign in to comment.