Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

fix: enable preload on MFS commands that accept IPFS paths #2355

Merged
merged 3 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions src/core/components/files-mfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ const callbackify = require('callbackify')
const PassThrough = require('stream').PassThrough
const pull = require('pull-stream/pull')
const map = require('pull-stream/throughs/map')
const isIpfs = require('is-ipfs')
const { cidToString } = require('../../utils/cid')

const mapLsFile = (options = {}) => {
const long = options.long || options.l

return (file) => {
return {
hash: long ? file.cid.toBaseEncodedString(options.cidBase) : '',
hash: long ? cidToString(file.cid, { base: options.cidBase }) : '',
name: file.name,
type: long ? file.type : 0,
size: long ? file.size || 0 : 0
Expand All @@ -32,15 +34,28 @@ module.exports = self => {
repoOwner: self._options.repoOwner
})

const withPreload = fn => (...args) => {
const paths = args.filter(arg => isIpfs.ipfsPath(arg) || isIpfs.cid(arg))

if (paths.length) {
const options = args[args.length - 1]
if (options.preload !== false) {
paths.forEach(path => self._preload(path))
}
}

return fn(...args)
}

return {
cp: callbackify.variadic(methods.cp),
cp: callbackify.variadic(withPreload(methods.cp)),
flush: callbackify.variadic(methods.flush),
ls: callbackify.variadic(async (path, options = {}) => {
ls: callbackify.variadic(withPreload(async (path, options = {}) => {
const files = await all(methods.ls(path, options))

return files.map(mapLsFile(options))
}),
lsReadableStream: (path, options = {}) => {
})),
lsReadableStream: withPreload((path, options = {}) => {
const stream = toReadableStream.obj(methods.ls(path, options))
const through = new PassThrough({
objectMode: true
Expand All @@ -60,33 +75,33 @@ module.exports = self => {
})

return through
},
lsPullStream: (path, options = {}) => {
}),
lsPullStream: withPreload((path, options = {}) => {
return pull(
toPullStream.source(methods.ls(path, options)),
map(mapLsFile(options))
)
},
}),
mkdir: callbackify.variadic(methods.mkdir),
mv: callbackify.variadic(methods.mv),
read: callbackify(async (path, options = {}) => {
mv: callbackify.variadic(withPreload(methods.mv)),
read: callbackify.variadic(withPreload(async (path, options = {}) => {
return Buffer.concat(await all(methods.read(path, options)))
}),
readPullStream: (path, options = {}) => {
})),
readPullStream: withPreload((path, options = {}) => {
return toPullStream.source(methods.read(path, options))
},
readReadableStream: (path, options = {}) => {
}),
readReadableStream: withPreload((path, options = {}) => {
return toReadableStream(methods.read(path, options))
},
}),
rm: callbackify.variadic(methods.rm),
stat: callbackify(async (path, options = {}) => {
stat: callbackify.variadic(withPreload(async (path, options = {}) => {
const stats = await methods.stat(path, options)

stats.hash = stats.cid.toBaseEncodedString(options && options.cidBase)
stats.hash = cidToString(stats.cid, { base: options.cidBase })
delete stats.cid

return stats
}),
})),
write: callbackify.variadic(async (path, content, options = {}) => {
if (isPullStream.isSource(content)) {
content = pullStreamToAsyncIterator(content)
Expand Down
12 changes: 6 additions & 6 deletions src/core/preload.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ module.exports = self => {
let requests = []
const apiUris = options.addresses.map(apiAddrToUri)

const api = (cid, callback) => {
const api = (path, callback) => {
callback = callback || noop

if (typeof cid !== 'string') {
if (typeof path !== 'string') {
try {
cid = new CID(cid).toBaseEncodedString()
path = new CID(path).toBaseEncodedString()
} catch (err) {
return setImmediate(() => callback(err))
}
Expand All @@ -50,14 +50,14 @@ module.exports = self => {
const now = Date.now()

retry({ times: fallbackApiUris.length }, (cb) => {
if (stopped) return cb(new Error(`preload aborted for ${cid}`))
if (stopped) return cb(new Error(`preload aborted for ${path}`))

// Remove failed request from a previous attempt
requests = requests.filter(r => r !== request)

const apiUri = fallbackApiUris.shift()

request = preload(`${apiUri}/api/v0/refs?r=true&arg=${cid}`, cb)
request = preload(`${apiUri}/api/v0/refs?r=true&arg=${encodeURIComponent(path)}`, cb)
requests = requests.concat(request)
}, (err) => {
requests = requests.filter(r => r !== request)
Expand All @@ -66,7 +66,7 @@ module.exports = self => {
return callback(err)
}

log(`preloaded ${cid} in ${Date.now() - now}ms`)
log(`preloaded ${path} in ${Date.now() - now}ms`)
callback()
})
}
Expand Down
138 changes: 138 additions & 0 deletions test/core/preload.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const pull = require('pull-stream')
const CID = require('cids')

const MockPreloadNode = require('../utils/mock-preload-node')
const IPFS = require('../../src')
Expand Down Expand Up @@ -326,6 +328,142 @@ describe('preload', () => {
})
})
})

it('should preload content retrieved with files.ls', done => {
let dirCid

waterfall([
cb => ipfs.add({ path: `/t/${hat()}`, content: Buffer.from(hat()) }, cb),
(res, cb) => {
dirCid = res[res.length - 1].hash
MockPreloadNode.waitForCids(dirCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => ipfs.files.ls(`/ipfs/${dirCid}`, err => cb(err)),
cb => MockPreloadNode.waitForCids(`/ipfs/${dirCid}`, cb)
], done)
})

it('should preload content retrieved with files.ls by CID', done => {
let dirCid

waterfall([
cb => ipfs.add({ path: `/t/${hat()}`, content: Buffer.from(hat()) }, cb),
(res, cb) => {
dirCid = res[res.length - 1].hash
MockPreloadNode.waitForCids(dirCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => ipfs.files.ls(new CID(dirCid), err => cb(err)),
cb => MockPreloadNode.waitForCids(dirCid, cb)
], done)
})

it('should preload content retrieved with files.lsReadableStream', done => {
let dirCid

waterfall([
cb => ipfs.add({ path: `/t/${hat()}`, content: Buffer.from(hat()) }, cb),
(res, cb) => {
dirCid = res[res.length - 1].hash
MockPreloadNode.waitForCids(dirCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => {
ipfs.files.lsReadableStream(`/ipfs/${dirCid}`)
.on('data', () => {})
.on('error', cb)
.on('end', cb)
},
cb => MockPreloadNode.waitForCids(`/ipfs/${dirCid}`, cb)
], done)
})

it('should preload content retrieved with files.lsPullStream', done => {
let dirCid

waterfall([
cb => ipfs.add({ path: `/t/${hat()}`, content: Buffer.from(hat()) }, cb),
(res, cb) => {
dirCid = res[res.length - 1].hash
MockPreloadNode.waitForCids(dirCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => pull(
ipfs.files.lsPullStream(`/ipfs/${dirCid}`),
pull.onEnd(cb)
),
cb => MockPreloadNode.waitForCids(`/ipfs/${dirCid}`, cb)
], done)
})

it('should preload content retrieved with files.read', done => {
let fileCid

waterfall([
cb => ipfs.add(Buffer.from(hat()), cb),
(res, cb) => {
fileCid = res[0].hash
MockPreloadNode.waitForCids(fileCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => ipfs.files.read(`/ipfs/${fileCid}`, err => cb(err)),
cb => MockPreloadNode.waitForCids(`/ipfs/${fileCid}`, cb)
], done)
})

it('should preload content retrieved with files.readReadableStream', done => {
let fileCid

waterfall([
cb => ipfs.add(Buffer.from(hat()), cb),
(res, cb) => {
fileCid = res[0].hash
MockPreloadNode.waitForCids(fileCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => {
ipfs.files.readReadableStream(`/ipfs/${fileCid}`)
.on('data', () => {})
.on('error', cb)
.on('end', cb)
},
cb => MockPreloadNode.waitForCids(`/ipfs/${fileCid}`, cb)
], done)
})

it('should preload content retrieved with files.readPullStream', done => {
let fileCid

waterfall([
cb => ipfs.add(Buffer.from(hat()), cb),
(res, cb) => {
fileCid = res[0].hash
MockPreloadNode.waitForCids(fileCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => pull(
ipfs.files.readPullStream(`/ipfs/${fileCid}`),
pull.onEnd(cb)
),
cb => MockPreloadNode.waitForCids(`/ipfs/${fileCid}`, cb)
], done)
})

it('should preload content retrieved with files.stat', done => {
let fileCid

waterfall([
cb => ipfs.add(Buffer.from(hat()), cb),
(res, cb) => {
fileCid = res[0].hash
MockPreloadNode.waitForCids(fileCid, cb)
},
cb => MockPreloadNode.clearPreloadCids(cb),
cb => ipfs.files.stat(`/ipfs/${fileCid}`, err => cb(err)),
cb => MockPreloadNode.waitForCids(`/ipfs/${fileCid}`, cb)
], done)
})
})

describe('preload disabled', function () {
Expand Down