diff --git a/package.json b/package.json index 5e419b719b..1f899e6cbd 100644 --- a/package.json +++ b/package.json @@ -67,12 +67,13 @@ "array-shuffle": "^1.0.1", "async": "^2.6.1", "async-iterator-all": "^1.0.0", + "async-iterator-first": "^1.0.0", "async-iterator-to-pull-stream": "^1.3.0", "async-iterator-to-stream": "^1.1.0", "base32.js": "~0.1.0", "bignumber.js": "^9.0.0", "binary-querystring": "~0.1.2", - "bl": "^3.0.0", + "bl": "^4.0.0", "bs58": "^4.0.1", "buffer-peek-stream": "^1.0.1", "byteman": "^1.3.5", @@ -99,7 +100,7 @@ "ipfs-bitswap": "^0.26.0", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.16.0", - "ipfs-http-client": "^38.2.0", + "ipfs-http-client": "^39.0.2", "ipfs-http-response": "~0.4.0", "ipfs-mfs": "^0.13.0", "ipfs-multipart": "^0.2.0", @@ -127,7 +128,7 @@ "jsondiffpatch": "~0.3.11", "just-safe-set": "^2.1.0", "kind-of": "^6.0.2", - "ky": "^0.14.0", + "ky": "^0.15.0", "ky-universal": "~0.3.0", "libp2p": "^0.26.2", "libp2p-bootstrap": "~0.9.3", @@ -178,7 +179,7 @@ "pull-sort": "^1.0.1", "pull-stream": "^3.6.14", "pull-stream-to-async-iterator": "^1.0.2", - "pull-stream-to-stream": "^1.3.4", + "pull-stream-to-stream": "^2.0.0", "pull-traverse": "^1.0.3", "readable-stream": "^3.4.0", "receptacle": "^1.3.2", @@ -200,7 +201,7 @@ "delay": "^4.1.0", "detect-node": "^2.0.4", "dir-compare": "^1.7.3", - "execa": "^2.0.4", + "execa": "^3.0.0", "form-data": "^2.5.1", "hat": "0.0.3", "interface-ipfs-core": "^0.117.2", diff --git a/src/cli/commands/cat.js b/src/cli/commands/cat.js index e34239dc56..0a4e8be0b8 100644 --- a/src/cli/commands/cat.js +++ b/src/cli/commands/cat.js @@ -22,14 +22,9 @@ module.exports = { resolve((async () => { const ipfs = await getIpfs() - return new Promise((resolve, reject) => { - const stream = ipfs.catReadableStream(ipfsPath, { offset, length }) - - stream.on('error', reject) - stream.on('end', resolve) - - stream.pipe(process.stdout) - }) + for await (const buf of ipfs._catAsyncIterator(ipfsPath, { offset, length })) { + process.stdout.write(buf) + } })()) } } diff --git a/src/core/components/files-regular/add.js b/src/core/components/files-regular/add.js index 20d641648d..dcf9e7bf52 100644 --- a/src/core/components/files-regular/add.js +++ b/src/core/components/files-regular/add.js @@ -5,7 +5,7 @@ const all = require('async-iterator-all') module.exports = function (self) { // can't use callbackify because if `data` is a pull stream // it thinks we are passing a callback. This is why we can't have nice things. - return (data, options, callback) => { + return function add (data, options, callback) { if (!callback && typeof options === 'function') { callback = options options = {} diff --git a/src/core/components/files-regular/cat-async-iterator.js b/src/core/components/files-regular/cat-async-iterator.js new file mode 100644 index 0000000000..6b7f1af116 --- /dev/null +++ b/src/core/components/files-regular/cat-async-iterator.js @@ -0,0 +1,30 @@ +'use strict' + +const exporter = require('ipfs-unixfs-exporter') +const { normalizePath } = require('./utils') + +module.exports = function (self) { + return async function * catAsyncIterator (ipfsPath, options) { + options = options || {} + + ipfsPath = normalizePath(ipfsPath) + + if (options.preload !== false) { + const pathComponents = ipfsPath.split('/') + self._preload(pathComponents[0]) + } + + const file = await exporter(ipfsPath, self._ipld, options) + + // File may not have unixfs prop if small & imported with rawLeaves true + if (file.unixfs && file.unixfs.type.includes('dir')) { + throw new Error('this dag node is a directory') + } + + if (!file.content) { + throw new Error('this dag node has no content') + } + + yield * file.content(options) + } +} diff --git a/src/core/components/files-regular/cat-pull-stream.js b/src/core/components/files-regular/cat-pull-stream.js index a491700d7d..5ad3f79411 100644 --- a/src/core/components/files-regular/cat-pull-stream.js +++ b/src/core/components/files-regular/cat-pull-stream.js @@ -1,43 +1,9 @@ 'use strict' -const exporter = require('ipfs-unixfs-exporter') -const deferred = require('pull-defer') const toPullStream = require('async-iterator-to-pull-stream') -const { normalizePath } = require('./utils') module.exports = function (self) { return function catPullStream (ipfsPath, options) { - if (typeof ipfsPath === 'function') { - throw new Error('You must supply an ipfsPath') - } - - options = options || {} - - ipfsPath = normalizePath(ipfsPath) - const pathComponents = ipfsPath.split('/') - - if (options.preload !== false) { - self._preload(pathComponents[0]) - } - - const d = deferred.source() - - exporter(ipfsPath, self._ipld, options) - .then(file => { - // File may not have unixfs prop if small & imported with rawLeaves true - if (file.unixfs && file.unixfs.type.includes('dir')) { - return d.abort(new Error('this dag node is a directory')) - } - - if (!file.content) { - return d.abort(new Error('this dag node has no content')) - } - - d.resolve(toPullStream.source(file.content(options))) - }, err => { - d.abort(err) - }) - - return d + return toPullStream.source(self._catAsyncIterator(ipfsPath, options)) } } diff --git a/src/core/components/files-regular/cat-readable-stream.js b/src/core/components/files-regular/cat-readable-stream.js index b0c8c89029..70df087f16 100644 --- a/src/core/components/files-regular/cat-readable-stream.js +++ b/src/core/components/files-regular/cat-readable-stream.js @@ -1,7 +1,11 @@ 'use strict' -const toStream = require('pull-stream-to-stream') +const toStream = require('it-to-stream') module.exports = function (self) { - return (ipfsPath, options) => toStream.source(self.catPullStream(ipfsPath, options)) + return function catReadableStream (ipfsPath, options) { + return toStream.readable(self._catAsyncIterator(ipfsPath, options), { + objectMode: true + }) + } } diff --git a/src/core/components/files-regular/cat.js b/src/core/components/files-regular/cat.js index 6740f62d9f..656946ad03 100644 --- a/src/core/components/files-regular/cat.js +++ b/src/core/components/files-regular/cat.js @@ -1,21 +1,10 @@ 'use strict' -const promisify = require('promisify-es6') -const pull = require('pull-stream') +const callbackify = require('callbackify') +const all = require('async-iterator-all') module.exports = function (self) { - return promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - pull( - self.catPullStream(ipfsPath, options), - pull.collect((err, buffers) => { - if (err) { return callback(err) } - callback(null, Buffer.concat(buffers)) - }) - ) + return callbackify.variadic(async function cat (ipfsPath, options) { + return Buffer.concat(await all(self._catAsyncIterator(ipfsPath, options))) }) } diff --git a/src/core/components/files-regular/get-async-iterator.js b/src/core/components/files-regular/get-async-iterator.js new file mode 100644 index 0000000000..b9ad234f4b --- /dev/null +++ b/src/core/components/files-regular/get-async-iterator.js @@ -0,0 +1,30 @@ +'use strict' + +const exporter = require('ipfs-unixfs-exporter') +const errCode = require('err-code') +const { normalizePath, mapFile } = require('./utils') + +module.exports = function (self) { + return async function * getAsyncIterator (ipfsPath, options) { + options = options || {} + + if (options.preload !== false) { + let pathComponents + + try { + pathComponents = normalizePath(ipfsPath).split('/') + } catch (err) { + throw errCode(err, 'ERR_INVALID_PATH') + } + + self._preload(pathComponents[0]) + } + + for await (const file of exporter.recursive(ipfsPath, self._ipld, options)) { + yield mapFile(file, { + ...options, + includeContent: true + }) + } + } +} diff --git a/src/core/components/files-regular/get-pull-stream.js b/src/core/components/files-regular/get-pull-stream.js index 86bc2554db..fa769aa89c 100644 --- a/src/core/components/files-regular/get-pull-stream.js +++ b/src/core/components/files-regular/get-pull-stream.js @@ -1,35 +1,20 @@ 'use strict' -const exporter = require('ipfs-unixfs-exporter') const toPullStream = require('async-iterator-to-pull-stream') -const errCode = require('err-code') const pull = require('pull-stream/pull') -const pullError = require('pull-stream/sources/error') const map = require('pull-stream/throughs/map') -const { normalizePath, mapFile } = require('./utils') module.exports = function (self) { - return (ipfsPath, options) => { - options = options || {} - - if (options.preload !== false) { - let pathComponents - - try { - pathComponents = normalizePath(ipfsPath).split('/') - } catch (err) { - return pullError(errCode(err, 'ERR_INVALID_PATH')) - } - - self._preload(pathComponents[0]) - } - + return function getPullStream (ipfsPath, options) { return pull( - toPullStream.source(exporter.recursive(ipfsPath, self._ipld, options)), - map(mapFile({ - ...options, - includeContent: true - })) + toPullStream.source(self._getAsyncIterator(ipfsPath, options)), + map(file => { + if (file.content) { + file.content = toPullStream.source(file.content()) + } + + return file + }) ) } } diff --git a/src/core/components/files-regular/get-readable-stream.js b/src/core/components/files-regular/get-readable-stream.js index a8a53150ea..3b6b78b3dd 100644 --- a/src/core/components/files-regular/get-readable-stream.js +++ b/src/core/components/files-regular/get-readable-stream.js @@ -1,24 +1,19 @@ 'use strict' -const pull = require('pull-stream') -const toStream = require('pull-stream-to-stream') +const toStream = require('it-to-stream') module.exports = function (self) { - return (ipfsPath, options) => { - options = options || {} + return function getReadableStream (ipfsPath, options) { + return toStream.readable((async function * mapStreamFileContents () { + for await (const file of self._getAsyncIterator(ipfsPath, options)) { + if (file.content) { + file.content = toStream.readable(file.content()) + } - return toStream.source( - pull( - self.getPullStream(ipfsPath, options), - pull.map((file) => { - if (file.content) { - file.content = toStream.source(file.content) - file.content.pause() - } - - return file - }) - ) - ) + yield file + } + })(), { + objectMode: true + }) } } diff --git a/src/core/components/files-regular/get.js b/src/core/components/files-regular/get.js index a64577c222..58e0434dfc 100644 --- a/src/core/components/files-regular/get.js +++ b/src/core/components/files-regular/get.js @@ -1,34 +1,18 @@ 'use strict' -const promisify = require('promisify-es6') -const pull = require('pull-stream') +const callbackify = require('callbackify') +const all = require('async-iterator-all') module.exports = function (self) { - return promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - pull( - self.getPullStream(ipfsPath, options), - pull.asyncMap((file, cb) => { + return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await + return all(async function * () { + for await (const file of self._getAsyncIterator(ipfsPath, options)) { if (file.content) { - pull( - file.content, - pull.collect((err, buffers) => { - if (err) { return cb(err) } - file.content = Buffer.concat(buffers) - cb(null, file) - }) - ) - } else { - cb(null, file) + file.content = Buffer.concat(await all(file.content())) } - }), - pull.collect(callback) - ) + + yield file + } + }()) }) } diff --git a/src/core/components/files-regular/index.js b/src/core/components/files-regular/index.js index 50e1f90ff4..8bff57d331 100644 --- a/src/core/components/files-regular/index.js +++ b/src/core/components/files-regular/index.js @@ -12,18 +12,23 @@ module.exports = (self) => { cat: require('./cat')(self), catPullStream: require('./cat-pull-stream')(self), catReadableStream: require('./cat-readable-stream')(self), + _catAsyncIterator: require('./cat-async-iterator')(self), get: require('./get')(self), getPullStream: require('./get-pull-stream')(self), getReadableStream: require('./get-readable-stream')(self), + _getAsyncIterator: require('./get-async-iterator')(self), ls: require('./ls')(self), lsPullStream: require('./ls-pull-stream')(self), lsReadableStream: require('./ls-readable-stream')(self), + _lsAsyncIterator: require('./ls-async-iterator')(self), refs: require('./refs')(self), refsReadableStream: require('./refs-readable-stream')(self), - refsPullStream: require('./refs-pull-stream')(self) + refsPullStream: require('./refs-pull-stream')(self), + _refsAsyncIterator: require('./refs-async-iterator')(self) } filesRegular.refs.local = require('./refs-local')(self) filesRegular.refs.localReadableStream = require('./refs-local-readable-stream')(self) filesRegular.refs.localPullStream = require('./refs-local-pull-stream')(self) + filesRegular.refs._localAsyncIterator = require('./refs-local-async-iterator')(self) return filesRegular } diff --git a/src/core/components/files-regular/ls-async-iterator.js b/src/core/components/files-regular/ls-async-iterator.js new file mode 100644 index 0000000000..34777a523e --- /dev/null +++ b/src/core/components/files-regular/ls-async-iterator.js @@ -0,0 +1,54 @@ +'use strict' + +const exporter = require('ipfs-unixfs-exporter') +const errCode = require('err-code') +const { normalizePath, mapFile } = require('./utils') + +module.exports = function (self) { + return async function * lsAsyncIterator (ipfsPath, options) { + options = options || {} + + const path = normalizePath(ipfsPath) + const recursive = options.recursive + const pathComponents = path.split('/') + + if (options.preload !== false) { + self._preload(pathComponents[0]) + } + + const file = await exporter(ipfsPath, self._ipld, options) + + if (!file.unixfs) { + throw errCode(new Error('dag node was not a UnixFS node'), 'ERR_NOT_UNIXFS') + } + + if (file.unixfs.type === 'file') { + return mapFile(file, options) + } + + if (file.unixfs.type.includes('dir')) { + if (recursive) { + for await (const child of exporter.recursive(file.cid, self._ipld, options)) { + if (file.cid.toBaseEncodedString() === child.cid.toBaseEncodedString()) { + continue + } + + yield mapFile(child, options) + } + + return + } + + for await (let child of file.content()) { + child = mapFile(child, options) + child.depth-- + + yield child + } + + return + } + + throw errCode(new Error(`Unknown UnixFS type ${file.unixfs.type}`), 'ERR_UNKNOWN_UNIXFS_TYPE') + } +} diff --git a/src/core/components/files-regular/ls-pull-stream.js b/src/core/components/files-regular/ls-pull-stream.js index e1ec58ce10..3cf5a6cd74 100644 --- a/src/core/components/files-regular/ls-pull-stream.js +++ b/src/core/components/files-regular/ls-pull-stream.js @@ -1,64 +1,9 @@ 'use strict' -const exporter = require('ipfs-unixfs-exporter') -const deferred = require('pull-defer') -const pull = require('pull-stream/pull') -const once = require('pull-stream/sources/once') -const map = require('pull-stream/throughs/map') -const filter = require('pull-stream/throughs/filter') -const errCode = require('err-code') const toPullStream = require('async-iterator-to-pull-stream') -const { normalizePath, mapFile } = require('./utils') module.exports = function (self) { - return function (ipfsPath, options) { - options = options || {} - - const path = normalizePath(ipfsPath) - const recursive = options.recursive - const pathComponents = path.split('/') - - if (options.preload !== false) { - self._preload(pathComponents[0]) - } - - const d = deferred.source() - - exporter(ipfsPath, self._ipld, options) - .then(file => { - if (!file.unixfs) { - return d.abort(errCode(new Error('dag node was not a UnixFS node'), 'ENOTUNIXFS')) - } - - if (file.unixfs.type === 'file') { - return d.resolve(once(mapFile(options)(file))) - } - - if (file.unixfs.type.includes('dir')) { - if (recursive) { - return d.resolve(pull( - toPullStream.source(exporter.recursive(file.cid, self._ipld, options)), - filter(child => file.cid.toBaseEncodedString() !== child.cid.toBaseEncodedString()), - map(mapFile(options)) - )) - } - - return d.resolve(pull( - toPullStream.source(file.content()), - map(mapFile(options)), - map((file) => { - file.depth-- - - return file - }) - )) - } - - d.abort(errCode(new Error(`Unknown UnixFS type ${file.unixfs.type}`), 'EUNKNOWNUNIXFSTYPE')) - }, err => { - d.abort(err) - }) - - return d + return function lsPullStream (ipfsPath, options) { + return toPullStream.source(self._lsAsyncIterator(ipfsPath, options)) } } diff --git a/src/core/components/files-regular/ls-readable-stream.js b/src/core/components/files-regular/ls-readable-stream.js index 1b02eef0bc..794095f752 100644 --- a/src/core/components/files-regular/ls-readable-stream.js +++ b/src/core/components/files-regular/ls-readable-stream.js @@ -1,9 +1,11 @@ 'use strict' -const toStream = require('pull-stream-to-stream') +const toStream = require('it-to-stream') module.exports = function (self) { - return (ipfsPath, options) => { - return toStream.source(self.lsPullStream(ipfsPath, options)) + return function lsReadableStream (ipfsPath, options) { + return toStream.readable(self._lsAsyncIterator(ipfsPath, options), { + objectMode: true + }) } } diff --git a/src/core/components/files-regular/ls.js b/src/core/components/files-regular/ls.js index 42e6ff04c9..9ae4a71a97 100644 --- a/src/core/components/files-regular/ls.js +++ b/src/core/components/files-regular/ls.js @@ -1,25 +1,10 @@ 'use strict' -const promisify = require('promisify-es6') -const pull = require('pull-stream') +const callbackify = require('callbackify') +const all = require('async-iterator-all') module.exports = function (self) { - return promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - pull( - self.lsPullStream(ipfsPath, options), - pull.collect((err, values) => { - if (err) { - return callback(err) - } - callback(null, values) - }) - ) + return callbackify.variadic(async function ls (ipfsPath, options) { // eslint-disable-line require-await + return all(self._lsAsyncIterator(ipfsPath, options)) }) } diff --git a/src/core/components/files-regular/refs-async-iterator.js b/src/core/components/files-regular/refs-async-iterator.js new file mode 100644 index 0000000000..0d3dbe08d3 --- /dev/null +++ b/src/core/components/files-regular/refs-async-iterator.js @@ -0,0 +1,157 @@ +'use strict' + +const isIpfs = require('is-ipfs') +const CID = require('cids') +const { DAGNode } = require('ipld-dag-pb') +const { normalizePath } = require('./utils') +const { Format } = require('./refs') +const { Errors } = require('interface-datastore') +const ERR_NOT_FOUND = Errors.notFoundError().code + +module.exports = function (self) { + return async function * refsAsyncIterator (ipfsPath, options) { // eslint-disable-line require-await + options = options || {} + + if (options.maxDepth === 0) { + return + } + + if (options.edges && options.format && options.format !== Format.default) { + throw new Error('Cannot set edges to true and also specify format') + } + + options.format = options.edges ? Format.edges : options.format || Format.default + + if (typeof options.maxDepth !== 'number') { + options.maxDepth = options.recursive ? Infinity : 1 + } + + const rawPaths = Array.isArray(ipfsPath) ? ipfsPath : [ipfsPath] + const paths = rawPaths.map(p => getFullPath(self, p, options)) + + for (const path of paths) { + yield * refsStream(self, path, options) + } + } +} + +function getFullPath (ipfs, ipfsPath, options) { + // normalizePath() strips /ipfs/ off the front of the path so the CID will + // be at the front of the path + const path = normalizePath(ipfsPath) + const pathComponents = path.split('/') + const cid = pathComponents[0] + + if (!isIpfs.cid(cid)) { + throw new Error(`Error resolving path '${path}': '${cid}' is not a valid CID`) + } + + if (options.preload !== false) { + ipfs._preload(cid) + } + + return '/ipfs/' + path +} + +// Get a stream of refs at the given path +async function * refsStream (ipfs, path, options) { + // Resolve to the target CID of the path + const resPath = await ipfs.resolve(path) + // path is /ipfs/ + const parts = resPath.split('/') + const cid = parts[2] + + // Traverse the DAG, converting it into a stream + for await (const obj of objectStream(ipfs, cid, options.maxDepth, options.unique)) { + // Root object will not have a parent + if (!obj.parent) { + continue + } + + // Filter out duplicates (isDuplicate flag is only set if options.unique is set) + if (obj.isDuplicate) { + continue + } + + // Format the links + // Clients expect refs to be in the format { ref: } + yield { + ref: formatLink(obj.parent.cid, obj.node.cid, obj.node.name, options.format) + } + } +} + +// Get formatted link +function formatLink (srcCid, dstCid, linkName, format) { + let out = format.replace(//g, srcCid.toString()) + out = out.replace(//g, dstCid.toString()) + out = out.replace(//g, linkName) + return out +} + +// Do a depth first search of the DAG, starting from the given root cid +async function * objectStream (ipfs, rootCid, maxDepth, uniqueOnly) { // eslint-disable-line require-await + const seen = new Set() + + async function * traverseLevel (parent, depth) { + const nextLevelDepth = depth + 1 + + // Check the depth + if (nextLevelDepth > maxDepth) { + return + } + + // Get this object's links + try { + // Look at each link, parent and the new depth + for (const link of await getLinks(ipfs, parent.cid)) { + yield { + parent: parent, + node: link, + isDuplicate: uniqueOnly && seen.has(link.cid.toString()) + } + + if (uniqueOnly) { + seen.add(link.cid.toString()) + } + + yield * traverseLevel(link, nextLevelDepth) + } + } catch (err) { + if (err.code === ERR_NOT_FOUND) { + err.message = `Could not find object with CID: ${parent.cid}` + } + + throw err + } + } + + yield * traverseLevel({ cid: rootCid }, 0) +} + +// Fetch a node from IPLD then get all its links +async function getLinks (ipfs, cid) { + const node = await ipfs._ipld.get(new CID(cid)) + + if (DAGNode.isDAGNode(node)) { + return node.Links.map(({ Name, Hash }) => ({ name: Name, cid: new CID(Hash) })) + } + + return getNodeLinks(node) +} + +// Recursively search the node for CIDs +function getNodeLinks (node, path = '') { + let links = [] + for (const [name, value] of Object.entries(node)) { + if (CID.isCID(value)) { + links.push({ + name: path + name, + cid: value + }) + } else if (typeof value === 'object') { + links = links.concat(getNodeLinks(value, path + name + '/')) + } + } + return links +} diff --git a/src/core/components/files-regular/refs-local-async-iterator.js b/src/core/components/files-regular/refs-local-async-iterator.js new file mode 100644 index 0000000000..62029cbac9 --- /dev/null +++ b/src/core/components/files-regular/refs-local-async-iterator.js @@ -0,0 +1,25 @@ +'use strict' + +const CID = require('cids') +const base32 = require('base32.js') + +module.exports = function (self) { + return async function * refsLocalAsyncIterator () { + for await (const result of self._repo.blocks.query({ keysOnly: true })) { + yield dsKeyToRef(result.key) + } + } +} + +function dsKeyToRef (key) { + try { + // Block key is of the form / + const decoder = new base32.Decoder() + const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) + return { + ref: new CID(buff).toString() + } + } catch (err) { + return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } + } +} diff --git a/src/core/components/files-regular/refs-local-pull-stream.js b/src/core/components/files-regular/refs-local-pull-stream.js index 77c396f58f..1bb02cec51 100644 --- a/src/core/components/files-regular/refs-local-pull-stream.js +++ b/src/core/components/files-regular/refs-local-pull-stream.js @@ -1,26 +1,9 @@ 'use strict' -const CID = require('cids') -const base32 = require('base32.js') -const itToPull = require('async-iterator-to-pull-stream') +const toPullStream = require('async-iterator-to-pull-stream') module.exports = function (self) { - return () => { - return itToPull((async function * () { - for await (const result of self._repo.blocks.query({ keysOnly: true })) { - yield dsKeyToRef(result.key) - } - })()) - } -} - -function dsKeyToRef (key) { - try { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) - return { ref: new CID(buff).toString() } - } catch (err) { - return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } + return function refsLocalPullStream () { + return toPullStream.source(self.refs._localAsyncIterator()) } } diff --git a/src/core/components/files-regular/refs-local-readable-stream.js b/src/core/components/files-regular/refs-local-readable-stream.js index b73eee29bf..f66920ef51 100644 --- a/src/core/components/files-regular/refs-local-readable-stream.js +++ b/src/core/components/files-regular/refs-local-readable-stream.js @@ -1,9 +1,11 @@ 'use strict' -const toStream = require('pull-stream-to-stream') +const toStream = require('it-to-stream') module.exports = function (self) { - return (ipfsPath, options) => { - return toStream.source(self.refs.localPullStream()) + return function refsLocalReadableStream () { + return toStream.readable(self.refs._localAsyncIterator(), { + objectMode: true + }) } } diff --git a/src/core/components/files-regular/refs-local.js b/src/core/components/files-regular/refs-local.js index 7d78388483..799b384e30 100644 --- a/src/core/components/files-regular/refs-local.js +++ b/src/core/components/files-regular/refs-local.js @@ -1,18 +1,10 @@ 'use strict' -const promisify = require('promisify-es6') -const pull = require('pull-stream') +const callbackify = require('callbackify') +const all = require('async-iterator-all') module.exports = function (self) { - return promisify((callback) => { - pull( - self.refs.localPullStream(), - pull.collect((err, values) => { - if (err) { - return callback(err) - } - callback(null, values) - }) - ) + return callbackify.variadic(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await + return all(self.refs._localAsyncIterator(ipfsPath, options)) }) } diff --git a/src/core/components/files-regular/refs-pull-stream.js b/src/core/components/files-regular/refs-pull-stream.js index d23ca53b6f..4d10812c37 100644 --- a/src/core/components/files-regular/refs-pull-stream.js +++ b/src/core/components/files-regular/refs-pull-stream.js @@ -1,185 +1,9 @@ 'use strict' -const pull = require('pull-stream') -const pullDefer = require('pull-defer') -const pullTraverse = require('pull-traverse') -const pullCat = require('pull-cat') -const isIpfs = require('is-ipfs') -const CID = require('cids') -const { DAGNode } = require('ipld-dag-pb') -const { normalizePath } = require('./utils') -const { Format } = require('./refs') - -const { Errors } = require('interface-datastore') -const ERR_NOT_FOUND = Errors.notFoundError().code +const toPullStream = require('async-iterator-to-pull-stream') module.exports = function (self) { - return function (ipfsPath, options) { - options = options || {} - - if (options.maxDepth === 0) { - return pull.empty() - } - if (options.edges && options.format && options.format !== Format.default) { - return pull.error(new Error('Cannot set edges to true and also specify format')) - } - - options.format = options.edges ? Format.edges : options.format || Format.default - - if (typeof options.maxDepth !== 'number') { - options.maxDepth = options.recursive ? Infinity : 1 - } - - let paths - try { - const rawPaths = Array.isArray(ipfsPath) ? ipfsPath : [ipfsPath] - paths = rawPaths.map(p => getFullPath(self, p, options)) - } catch (err) { - return pull.error(err) - } - - return pullCat(paths.map(p => refsStream(self, p, options))) - } -} - -function getFullPath (ipfs, ipfsPath, options) { - // normalizePath() strips /ipfs/ off the front of the path so the CID will - // be at the front of the path - const path = normalizePath(ipfsPath) - const pathComponents = path.split('/') - const cid = pathComponents[0] - if (!isIpfs.cid(cid)) { - throw new Error(`Error resolving path '${path}': '${cid}' is not a valid CID`) - } - - if (options.preload !== false) { - ipfs._preload(cid) - } - - return '/ipfs/' + path -} - -// Get a stream of refs at the given path -function refsStream (ipfs, path, options) { - const deferred = pullDefer.source() - - // Resolve to the target CID of the path - ipfs.resolve(path, (err, resPath) => { - if (err) { - return deferred.resolve(pull.error(err)) - } - - // path is /ipfs/ - const parts = resPath.split('/') - const cid = parts[2] - deferred.resolve(pull( - // Traverse the DAG, converting it into a stream - objectStream(ipfs, cid, options.maxDepth, options.unique), - // Root object will not have a parent - pull.filter(obj => Boolean(obj.parent)), - // Filter out duplicates (isDuplicate flag is only set if options.unique is set) - pull.filter(obj => !obj.isDuplicate), - // Format the links - pull.map(obj => formatLink(obj.parent.cid, obj.node.cid, obj.node.name, options.format)), - // Clients expect refs to be in the format { ref: } - pull.map(ref => ({ ref })) - )) - }) - - return deferred -} - -// Get formatted link -function formatLink (srcCid, dstCid, linkName, format) { - let out = format.replace(//g, srcCid.toString()) - out = out.replace(//g, dstCid.toString()) - out = out.replace(//g, linkName) - return out -} - -// Do a depth first search of the DAG, starting from the given root cid -function objectStream (ipfs, rootCid, maxDepth, isUnique) { - const uniques = new Set() - - const root = { node: { cid: rootCid }, depth: 0 } - const traverseLevel = (obj) => { - const { node, depth } = obj - - // Check the depth - const nextLevelDepth = depth + 1 - if (nextLevelDepth > maxDepth) { - return pull.empty() - } - - // If unique option is enabled, check if the CID has been seen before. - // Note we need to do this here rather than before adding to the stream - // so that the unique check happens in the order that items are examined - // in the DAG. - if (isUnique) { - if (uniques.has(node.cid.toString())) { - // Mark this object as a duplicate so we can filter it out later - obj.isDuplicate = true - return pull.empty() - } - uniques.add(node.cid.toString()) - } - - const deferred = pullDefer.source() - - // Get this object's links - getLinks(ipfs, node.cid, (err, links) => { - if (err) { - if (err.code === ERR_NOT_FOUND) { - err.message = `Could not find object with CID: ${node.cid}` - } - return deferred.resolve(pull.error(err)) - } - - // Add to the stream each link, parent and the new depth - const vals = links.map(link => ({ - parent: node, - node: link, - depth: nextLevelDepth - })) - - deferred.resolve(pull.values(vals)) - }) - - return deferred - } - - return pullTraverse.depthFirst(root, traverseLevel) -} - -// Fetch a node from IPLD then get all its links -function getLinks (ipfs, cid, callback) { - ipfs._ipld.get(new CID(cid)) - .then(node => { - let links - if (DAGNode.isDAGNode(node)) { - links = node.Links.map(({ Name, Hash }) => { - return { name: Name, cid: new CID(Hash) } - }) - } else { - links = getNodeLinks(node) - } - callback(null, links) - }) - .catch(callback) -} - -// Recursively search the node for CIDs -function getNodeLinks (node, path = '') { - let links = [] - for (const [name, value] of Object.entries(node)) { - if (CID.isCID(value)) { - links.push({ - name: path + name, - cid: value - }) - } else if (typeof value === 'object') { - links = links.concat(getNodeLinks(value, path + name + '/')) - } + return function refsPullStream (ipfsPath, options) { + return toPullStream.source(self._refsAsyncIterator(ipfsPath, options)) } - return links } diff --git a/src/core/components/files-regular/refs-readable-stream.js b/src/core/components/files-regular/refs-readable-stream.js index 4c09a9f952..509e65a508 100644 --- a/src/core/components/files-regular/refs-readable-stream.js +++ b/src/core/components/files-regular/refs-readable-stream.js @@ -1,9 +1,11 @@ 'use strict' -const toStream = require('pull-stream-to-stream') +const toStream = require('it-to-stream') module.exports = function (self) { - return (ipfsPath, options) => { - return toStream.source(self.refsPullStream(ipfsPath, options)) + return function refsReadableStream (ipfsPath, options) { + return toStream.readable(self._refsAsyncIterator(ipfsPath, options), { + objectMode: true + }) } } diff --git a/src/core/components/files-regular/refs.js b/src/core/components/files-regular/refs.js index 4b182278a1..4876457606 100644 --- a/src/core/components/files-regular/refs.js +++ b/src/core/components/files-regular/refs.js @@ -1,26 +1,11 @@ 'use strict' -const promisify = require('promisify-es6') -const pull = require('pull-stream') +const callbackify = require('callbackify') +const all = require('async-iterator-all') module.exports = function (self) { - return promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - pull( - self.refsPullStream(ipfsPath, options), - pull.collect((err, values) => { - if (err) { - return callback(err) - } - callback(null, values) - }) - ) + return callbackify.variadic(async function refs (ipfsPath, options) { // eslint-disable-line require-await + return all(self._refsAsyncIterator(ipfsPath, options)) }) } diff --git a/src/core/components/files-regular/utils.js b/src/core/components/files-regular/utils.js index bd5afc5fd2..876d0b0d48 100644 --- a/src/core/components/files-regular/utils.js +++ b/src/core/components/files-regular/utils.js @@ -3,7 +3,6 @@ const CID = require('cids') const { Buffer } = require('buffer') const { cidToString } = require('../../../utils/cid') -const toPullStream = require('async-iterator-to-pull-stream') const normalizePath = (path) => { if (Buffer.isBuffer(path)) { @@ -101,33 +100,31 @@ const parseChunkSize = (str, name) => { return size } -const mapFile = (options) => { +const mapFile = (file, options) => { options = options || {} - return (file) => { - let size = 0 - let type = 'dir' + let size = 0 + let type = 'dir' - if (file.unixfs && file.unixfs.type === 'file') { - size = file.unixfs.fileSize() - type = 'file' - } - - const output = { - hash: cidToString(file.cid, { base: options.cidBase }), - path: file.path, - name: file.name, - depth: file.path.split('/').length, - size, - type - } + if (file.unixfs && file.unixfs.type === 'file') { + size = file.unixfs.fileSize() + type = 'file' + } - if (options.includeContent && file.unixfs && file.unixfs.type === 'file') { - output.content = toPullStream.source(file.content()) - } + const output = { + hash: cidToString(file.cid, { base: options.cidBase }), + path: file.path, + name: file.name, + depth: file.path.split('/').length, + size, + type + } - return output + if (options.includeContent && file.unixfs && file.unixfs.type === 'file') { + output.content = file.content } + + return output } module.exports = { diff --git a/src/http/api/resources/files-regular.js b/src/http/api/resources/files-regular.js index d71842b3ac..bc963ce3d1 100644 --- a/src/http/api/resources/files-regular.js +++ b/src/http/api/resources/files-regular.js @@ -8,10 +8,8 @@ log.error = debug('ipfs:http-api:files:error') const pull = require('pull-stream') const pushable = require('pull-pushable') const toStream = require('pull-stream-to-stream') -const abortable = require('pull-abortable') const Joi = require('@hapi/joi') const Boom = require('@hapi/boom') -const ndjson = require('pull-ndjson') const { PassThrough } = require('readable-stream') const multibase = require('multibase') const isIpfs = require('is-ipfs') @@ -330,20 +328,30 @@ exports.refs = { const { ipfs } = request.server.app const { key } = request.pre.args - const recursive = request.query.recursive - const format = request.query.format - const edges = request.query.edges - const unique = request.query.unique - const maxDepth = request.query['max-depth'] + const options = { + recursive: request.query.recursive, + format: request.query.format, + edges: request.query.edges, + unique: request.query.unique, + maxDepth: request.query['max-depth'] + } // have to do this here otherwise the validation error appears in the stream tail and // this doesn't work in browsers: https://github.com/ipfs/js-ipfs/issues/2519 - if (edges && format !== Format.default) { + if (options.edges && options.format !== Format.default) { throw Boom.badRequest('Cannot set edges to true and also specify format') } - const source = ipfs.refsPullStream(key, { recursive, format, edges, unique, maxDepth }) - return sendRefsReplyStream(request, h, `refs for ${key}`, source) + return streamResponse(request, h, async (output) => { + for await (const ref of ipfs._refsAsyncIterator(key, options)) { + output.write( + JSON.stringify({ + Ref: ref.ref, + Err: ref.err + }) + '\n' + ) + } + }) } } @@ -351,54 +359,40 @@ exports.refs.local = { // main route handler handler (request, h) { const { ipfs } = request.server.app - const source = ipfs.refs.localPullStream() - return sendRefsReplyStream(request, h, 'local refs', source) - } -} -function sendRefsReplyStream (request, h, desc, source) { - const replyStream = pushable() - const aborter = abortable() - - const stream = toStream.source(pull( - replyStream, - aborter, - ndjson.serialize() - )) - - // const stream = toStream.source(replyStream.source) - // hapi is not very clever and throws if no - // - _read method - // - _readableState object - // are there :( - if (!stream._read) { - stream._read = () => {} - stream._readableState = {} - stream.unpipe = () => {} + return streamResponse(request, h, async (output) => { + for await (const ref of ipfs.refs._localAsyncIterator()) { + output.write( + JSON.stringify({ + Ref: ref.ref, + Err: ref.err + }) + '\n' + ) + } + }) } +} - pull( - source, - pull.drain( - (ref) => replyStream.push({ Ref: ref.ref, Err: ref.err }), - (err) => { - if (err) { - request.raw.res.addTrailers({ - 'X-Stream-Error': JSON.stringify({ - Message: `Failed to get ${desc}: ${err.message || ''}`, - Code: 0 - }) - }) - return aborter.abort() - } - - replyStream.end() - } - ) - ) +function streamResponse (request, h, fn) { + const output = new PassThrough() + const errorTrailer = 'X-Stream-Error' + + Promise.resolve() + .then(() => fn(output)) + .catch(err => { + request.raw.res.addTrailers({ + [errorTrailer]: JSON.stringify({ + Message: err.message, + Code: 0 + }) + }) + }) + .finally(() => { + output.end() + }) - return h.response(stream) + return h.response(output) .header('x-chunked-output', '1') .header('content-type', 'application/json') - .header('Trailer', 'X-Stream-Error') + .header('Trailer', errorTrailer) } diff --git a/test/cli/files.js b/test/cli/files.js index aef70f4a8b..4858cc0cb0 100644 --- a/test/cli/files.js +++ b/test/cli/files.js @@ -360,13 +360,6 @@ describe('files', () => runOnAndOff((thing) => { expect(out).to.eql(readme) }) - it('cat alias', async function () { - this.timeout(20 * 1000) - - const out = await ipfs('cat QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB') - expect(out).to.eql(readme) - }) - it('cat part of a file using `count`', async function () { this.timeout(30 * 1000) diff --git a/test/core/files.spec.js b/test/core/files.spec.js index b620306c17..ea8ca2380a 100644 --- a/test/core/files.spec.js +++ b/test/core/files.spec.js @@ -49,6 +49,7 @@ describe('files', function () { const invalidPath = null const stream = ipfs.getReadableStream(invalidPath) + stream.on('data', () => {}) stream.on('error', (err) => { expect(err).to.exist() expect(err.code).to.equal('ERR_INVALID_PATH') diff --git a/test/utils/ipfs-exec.js b/test/utils/ipfs-exec.js index 41f11a6fe2..52e669913e 100644 --- a/test/utils/ipfs-exec.js +++ b/test/utils/ipfs-exec.js @@ -21,7 +21,8 @@ module.exports = (repoPath, opts) => { const config = Object.assign({}, { stripFinalNewline: false, env: env, - timeout: 60 * 1000 + timeout: 60 * 1000, + all: true }, opts) const exec = (args, options) => { const opts = Object.assign({}, config, options) @@ -57,6 +58,7 @@ module.exports = (repoPath, opts) => { res.stdin = cp.stdin res.stdout = cp.stdout res.stderr = cp.stderr + res.all = cp.all return res }