Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

chore: converts remaining file api methods to async iterators #2517

Merged
merged 12 commits into from
Oct 23, 2019
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@
"array-shuffle": "^1.0.1",
"async": "^2.6.1",
"async-iterator-all": "^1.0.0",
"async-iterator-first": "^1.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"async-iterator-to-stream": "^1.1.0",
"base32.js": "~0.1.0",
"bignumber.js": "^9.0.0",
"binary-querystring": "~0.1.2",
"bl": "^3.0.0",
"bl": "^4.0.0",
"bs58": "^4.0.1",
"buffer-peek-stream": "^1.0.1",
"byteman": "^1.3.5",
Expand All @@ -99,7 +100,7 @@
"ipfs-bitswap": "^0.26.0",
"ipfs-block": "~0.8.1",
"ipfs-block-service": "~0.16.0",
"ipfs-http-client": "^38.2.0",
"ipfs-http-client": "^39.0.0",
"ipfs-http-response": "~0.3.1",
"ipfs-mfs": "^0.13.0",
"ipfs-multipart": "^0.2.0",
Expand Down Expand Up @@ -127,7 +128,7 @@
"jsondiffpatch": "~0.3.11",
"just-safe-set": "^2.1.0",
"kind-of": "^6.0.2",
"ky": "^0.14.0",
"ky": "^0.15.0",
"ky-universal": "~0.3.0",
"libp2p": "^0.26.2",
"libp2p-bootstrap": "~0.9.3",
Expand Down Expand Up @@ -178,7 +179,7 @@
"pull-sort": "^1.0.1",
"pull-stream": "^3.6.14",
"pull-stream-to-async-iterator": "^1.0.2",
"pull-stream-to-stream": "^1.3.4",
"pull-stream-to-stream": "^2.0.0",
"pull-traverse": "^1.0.3",
"readable-stream": "^3.4.0",
"receptacle": "^1.3.2",
Expand All @@ -200,7 +201,7 @@
"delay": "^4.1.0",
"detect-node": "^2.0.4",
"dir-compare": "^1.7.3",
"execa": "^2.0.4",
"execa": "^3.0.0",
"form-data": "^2.5.1",
"hat": "0.0.3",
"interface-ipfs-core": "^0.117.2",
Expand Down
11 changes: 3 additions & 8 deletions src/cli/commands/cat.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ module.exports = {
resolve((async () => {
const ipfs = await getIpfs()

return new Promise((resolve, reject) => {
const stream = ipfs.catReadableStream(ipfsPath, { offset, length })

stream.on('error', reject)
stream.on('end', resolve)

stream.pipe(process.stdout)
})
for await (const buf of ipfs._catAsyncIterator(ipfsPath, { offset, length })) {
process.stdout.write(buf)
}
})())
}
}
2 changes: 1 addition & 1 deletion src/core/components/files-regular/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const all = require('async-iterator-all')
module.exports = function (self) {
// can't use callbackify because if `data` is a pull stream
// it thinks we are passing a callback. This is why we can't have nice things.
return (data, options, callback) => {
return function add (data, options, callback) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move from arrow func to named func here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To have function names in any stack traces emitted making them a teensy bit easier to read.

if (!callback && typeof options === 'function') {
callback = options
options = {}
Expand Down
30 changes: 30 additions & 0 deletions src/core/components/files-regular/cat-async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const { normalizePath } = require('./utils')

module.exports = function (self) {
return async function * catAsyncIterator (ipfsPath, options) {
options = options || {}

ipfsPath = normalizePath(ipfsPath)

if (options.preload !== false) {
const pathComponents = ipfsPath.split('/')
self._preload(pathComponents[0])
}

const file = await exporter(ipfsPath, self._ipld, options)

// File may not have unixfs prop if small & imported with rawLeaves true
if (file.unixfs && file.unixfs.type.includes('dir')) {
throw new Error('this dag node is a directory')
}

if (!file.content) {
throw new Error('this dag node has no content')
}

yield * file.content(options)
}
}
36 changes: 1 addition & 35 deletions src/core/components/files-regular/cat-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,9 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const deferred = require('pull-defer')
const toPullStream = require('async-iterator-to-pull-stream')
const { normalizePath } = require('./utils')

module.exports = function (self) {
return function catPullStream (ipfsPath, options) {
if (typeof ipfsPath === 'function') {
throw new Error('You must supply an ipfsPath')
}

options = options || {}

ipfsPath = normalizePath(ipfsPath)
const pathComponents = ipfsPath.split('/')

if (options.preload !== false) {
self._preload(pathComponents[0])
}

const d = deferred.source()

exporter(ipfsPath, self._ipld, options)
.then(file => {
// File may not have unixfs prop if small & imported with rawLeaves true
if (file.unixfs && file.unixfs.type.includes('dir')) {
return d.abort(new Error('this dag node is a directory'))
}

if (!file.content) {
return d.abort(new Error('this dag node has no content'))
}

d.resolve(toPullStream.source(file.content(options)))
}, err => {
d.abort(err)
})

return d
return toPullStream.source(self._catAsyncIterator(ipfsPath, options))
}
}
8 changes: 6 additions & 2 deletions src/core/components/files-regular/cat-readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
'use strict'

const toStream = require('pull-stream-to-stream')
const toStream = require('it-to-stream')

module.exports = function (self) {
return (ipfsPath, options) => toStream.source(self.catPullStream(ipfsPath, options))
return function catReadableStream (ipfsPath, options) {
return toStream.readable(self._catAsyncIterator(ipfsPath, options), {
objectMode: true
})
}
}
19 changes: 4 additions & 15 deletions src/core/components/files-regular/cat.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
'use strict'

const promisify = require('promisify-es6')
const pull = require('pull-stream')
const callbackify = require('callbackify')
const all = require('async-iterator-all')

module.exports = function (self) {
return promisify((ipfsPath, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

pull(
self.catPullStream(ipfsPath, options),
pull.collect((err, buffers) => {
if (err) { return callback(err) }
callback(null, Buffer.concat(buffers))
})
)
return callbackify.variadic(async function cat (ipfsPath, options) {
return Buffer.concat(await all(self._catAsyncIterator(ipfsPath, options)))
})
}
30 changes: 30 additions & 0 deletions src/core/components/files-regular/get-async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const errCode = require('err-code')
const { normalizePath, mapFile } = require('./utils')

module.exports = function (self) {
return async function * getAsyncIterator (ipfsPath, options) {
options = options || {}

if (options.preload !== false) {
let pathComponents

try {
pathComponents = normalizePath(ipfsPath).split('/')
} catch (err) {
throw errCode(err, 'ERR_INVALID_PATH')
}

self._preload(pathComponents[0])
}

for await (const file of exporter.recursive(ipfsPath, self._ipld, options)) {
yield mapFile(file, {
...options,
includeContent: true
})
}
}
}
33 changes: 9 additions & 24 deletions src/core/components/files-regular/get-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,20 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const toPullStream = require('async-iterator-to-pull-stream')
const errCode = require('err-code')
const pull = require('pull-stream/pull')
const pullError = require('pull-stream/sources/error')
const map = require('pull-stream/throughs/map')
const { normalizePath, mapFile } = require('./utils')

module.exports = function (self) {
return (ipfsPath, options) => {
options = options || {}

if (options.preload !== false) {
let pathComponents

try {
pathComponents = normalizePath(ipfsPath).split('/')
} catch (err) {
return pullError(errCode(err, 'ERR_INVALID_PATH'))
}

self._preload(pathComponents[0])
}

return function getPullStream (ipfsPath, options) {
return pull(
toPullStream.source(exporter.recursive(ipfsPath, self._ipld, options)),
map(mapFile({
...options,
includeContent: true
}))
toPullStream.source(self._getAsyncIterator(ipfsPath, options)),
map(file => {
if (file.content) {
file.content = toPullStream.source(file.content())
}

return file
})
)
}
}
29 changes: 12 additions & 17 deletions src/core/components/files-regular/get-readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
'use strict'

const pull = require('pull-stream')
const toStream = require('pull-stream-to-stream')
const toStream = require('it-to-stream')

module.exports = function (self) {
return (ipfsPath, options) => {
options = options || {}
return function getReadableStream (ipfsPath, options) {
return toStream.readable((async function * mapStreamFileContents () {
for await (const file of self._getAsyncIterator(ipfsPath, options)) {
if (file.content) {
file.content = toStream.readable(file.content())
}

return toStream.source(
pull(
self.getPullStream(ipfsPath, options),
pull.map((file) => {
if (file.content) {
file.content = toStream.source(file.content)
file.content.pause()
}

return file
})
)
)
yield file
}
})(), {
objectMode: true
})
}
}
36 changes: 10 additions & 26 deletions src/core/components/files-regular/get.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,18 @@
'use strict'

const promisify = require('promisify-es6')
const pull = require('pull-stream')
const callbackify = require('callbackify')
const all = require('async-iterator-all')

module.exports = function (self) {
return promisify((ipfsPath, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

pull(
self.getPullStream(ipfsPath, options),
pull.asyncMap((file, cb) => {
return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await
return all(async function * () {
for await (const file of self._getAsyncIterator(ipfsPath, options)) {
if (file.content) {
pull(
file.content,
pull.collect((err, buffers) => {
if (err) { return cb(err) }
file.content = Buffer.concat(buffers)
cb(null, file)
})
)
} else {
cb(null, file)
file.content = Buffer.concat(await all(file.content()))
}
}),
pull.collect(callback)
)

yield file
}
}())
})
}
7 changes: 6 additions & 1 deletion src/core/components/files-regular/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ module.exports = (self) => {
cat: require('./cat')(self),
catPullStream: require('./cat-pull-stream')(self),
catReadableStream: require('./cat-readable-stream')(self),
_catAsyncIterator: require('./cat-async-iterator')(self),
get: require('./get')(self),
getPullStream: require('./get-pull-stream')(self),
getReadableStream: require('./get-readable-stream')(self),
_getAsyncIterator: require('./get-async-iterator')(self),
ls: require('./ls')(self),
lsPullStream: require('./ls-pull-stream')(self),
lsReadableStream: require('./ls-readable-stream')(self),
_lsAsyncIterator: require('./ls-async-iterator')(self),
refs: require('./refs')(self),
refsReadableStream: require('./refs-readable-stream')(self),
refsPullStream: require('./refs-pull-stream')(self)
refsPullStream: require('./refs-pull-stream')(self),
_refsAsyncIterator: require('./refs-async-iterator')(self)
}
filesRegular.refs.local = require('./refs-local')(self)
filesRegular.refs.localReadableStream = require('./refs-local-readable-stream')(self)
filesRegular.refs.localPullStream = require('./refs-local-pull-stream')(self)
filesRegular.refs._localAsyncIterator = require('./refs-local-async-iterator')(self)
return filesRegular
}
Loading