From 7e9a0ee312cc033cab6d4084684aca1bff0336e4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 4 Oct 2019 08:07:35 +0100 Subject: [PATCH] chore: converts remaining files api to async iterators --- package.json | 1 + src/files-regular/get-pull-stream.js | 44 --------- src/files-regular/get-readable-stream.js | 34 ------- src/files-regular/get.js | 70 +++++++------ src/files-regular/index.js | 83 +++++++++++----- src/files-regular/ls-pull-stream.js | 74 -------------- src/files-regular/ls-readable-stream.js | 72 -------------- src/files-regular/ls.js | 90 +++++++++-------- src/files-regular/refs-local-pull-stream.js | 27 ----- .../refs-local-readable-stream.js | 25 ----- src/files-regular/refs-local.js | 42 +++----- src/files-regular/refs-pull-stream.js | 34 ------- src/files-regular/refs-readable-stream.js | 32 ------ src/files-regular/refs.js | 98 ++++++++---------- src/utils/tar-stream-to-objects.js | 99 +++++++++++++------ test/get.spec.js | 17 +--- 16 files changed, 274 insertions(+), 568 deletions(-) delete mode 100644 src/files-regular/get-pull-stream.js delete mode 100644 src/files-regular/get-readable-stream.js delete mode 100644 src/files-regular/ls-pull-stream.js delete mode 100644 src/files-regular/ls-readable-stream.js delete mode 100644 src/files-regular/refs-local-pull-stream.js delete mode 100644 src/files-regular/refs-local-readable-stream.js delete mode 100644 src/files-regular/refs-pull-stream.js delete mode 100644 src/files-regular/refs-readable-stream.js diff --git a/package.json b/package.json index 7dfe6848a..4251789de 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "detect-node": "^2.0.4", "end-of-stream": "^1.4.1", "err-code": "^2.0.0", + "event-iterator": "^1.2.0", "explain-error": "^1.0.4", "flatmap": "0.0.3", "form-data": "^2.5.1", diff --git a/src/files-regular/get-pull-stream.js b/src/files-regular/get-pull-stream.js deleted file mode 100644 index 4b2ede770..000000000 --- a/src/files-regular/get-pull-stream.js +++ /dev/null @@ -1,44 +0,0 @@ -'use strict' - -const cleanCID = require('../utils/clean-cid') -const TarStreamToObjects = require('../utils/tar-stream-to-objects') -const v = require('is-ipfs') -const pull = require('pull-stream/pull') -const map = require('pull-stream/throughs/map') -const toPull = require('stream-to-pull-stream') -const deferred = require('pull-defer') - -module.exports = (send) => { - return (path, opts) => { - opts = opts || {} - - const p = deferred.source() - - try { - path = cleanCID(path) - } catch (err) { - if (!v.ipfsPath(path)) { - return p.end(err) - } - } - - const request = { path: 'get', args: path, qs: opts } - - // Convert the response stream to TarStream objects - send.andTransform(request, TarStreamToObjects, (err, stream) => { - if (err) { return p.end(err) } - - p.resolve( - pull( - toPull.source(stream), - map(file => { - const { path, content } = file - return content ? { path, content: toPull.source(content) } : file - }) - ) - ) - }) - - return p - } -} diff --git a/src/files-regular/get-readable-stream.js b/src/files-regular/get-readable-stream.js deleted file mode 100644 index 0bace519f..000000000 --- a/src/files-regular/get-readable-stream.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict' - -const cleanCID = require('../utils/clean-cid') -const TarStreamToObjects = require('../utils/tar-stream-to-objects') -const v = require('is-ipfs') -const Stream = require('readable-stream') -const pump = require('pump') - -module.exports = (send) => { - return (path, opts) => { - opts = opts || {} - - const pt = new Stream.PassThrough({ objectMode: true }) - - try { - path = cleanCID(path) - } catch (err) { - if (!v.ipfsPath(path)) { - return pt.destroy(err) - } - } - - const request = { path: 'get', args: path, qs: opts } - - // Convert the response stream to TarStream objects - send.andTransform(request, TarStreamToObjects, (err, stream) => { - if (err) { return pt.destroy(err) } - - pump(stream, pt) - }) - - return pt - } -} diff --git a/src/files-regular/get.js b/src/files-regular/get.js index 045158d88..6c942648b 100644 --- a/src/files-regular/get.js +++ b/src/files-regular/get.js @@ -1,52 +1,48 @@ 'use strict' -const promisify = require('promisify-es6') +const configure = require('../lib/configure') +const tarStreamToObjects = require('../utils/tar-stream-to-objects') +const IsIpfs = require('is-ipfs') const cleanCID = require('../utils/clean-cid') -const TarStreamToObjects = require('../utils/tar-stream-to-objects') -const concat = require('concat-stream') -const through = require('through2') -const v = require('is-ipfs') - -module.exports = (send) => { - return promisify((path, opts, callback) => { - if (typeof opts === 'function' && !callback) { - callback = opts - opts = {} - } - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && typeof callback === 'function') { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return async function * get (path, options) { + options = options || {} try { path = cleanCID(path) } catch (err) { - if (!v.ipfsPath(path)) { - return callback(err) + if (!IsIpfs.ipfsPath(path)) { + throw err } } - const request = { path: 'get', args: path, qs: opts } + const searchParams = new URLSearchParams() + searchParams.set('arg', path.toString()) - // Convert the response stream to TarStream objects - send.andTransform(request, TarStreamToObjects, (err, stream) => { - if (err) { return callback(err) } + if (options.compress !== undefined) { + searchParams.set('compress', options.compress) + } - const files = [] + if (options.compressionLevel !== undefined) { + searchParams.set('compression-level', options.compressionLevel) + } - stream.pipe(through.obj((file, enc, next) => { - if (file.content) { - file.content.pipe(concat((content) => { - files.push({ path: file.path, content: content }) - })) - } else { - files.push(file) - } - next() - }, () => callback(null, files))) + if (options.offset) { + searchParams.set('offset', options.offset) + } + + if (options.length) { + searchParams.set('length', options.length) + } + + const res = await ky.get('get', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams }) - }) -} + + yield * tarStreamToObjects(res.body) + } +}) diff --git a/src/files-regular/index.js b/src/files-regular/index.js index 9bfe10a5d..3381102d5 100644 --- a/src/files-regular/index.js +++ b/src/files-regular/index.js @@ -1,17 +1,24 @@ 'use strict' const nodeify = require('promise-nodeify') -const moduleConfig = require('../utils/module-config') +const callbackify = require('callbackify') +const all = require('async-iterator-all') const { concatify, collectify, pullify, streamify } = require('../lib/converters') +const toPullStream = require('async-iterator-to-pull-stream') +const pull = require('pull-stream/pull') +const map = require('pull-stream/throughs/map') -module.exports = (arg) => { - const send = moduleConfig(arg) - const add = require('../add')(arg) - const addFromFs = require('../add-from-fs')(arg) - const addFromURL = require('../add-from-url')(arg) - const cat = require('../cat')(arg) +module.exports = (config) => { + const add = require('../add')(config) + const addFromFs = require('../add-from-fs')(config) + const addFromURL = require('../add-from-url')(config) + const cat = require('../cat')(config) + const get = require('./get')(config) + const ls = require('./ls')(config) + const refs = require('./refs')(config) + const refsLocal = require('./refs-local')(config) - return { + const api = { add: (input, options, callback) => { if (typeof options === 'function') { callback = options @@ -43,23 +50,51 @@ module.exports = (arg) => { return nodeify(collectify(add)(input, options), callback) }, _addAsyncIterator: add, - cat: (path, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - return nodeify(concatify(cat)(path, options), callback) - }, + cat: callbackify.variadic((path, options) => concatify(cat)(path, options)), catReadableStream: streamify.readable(cat), catPullStream: pullify.source(cat), - get: require('../files-regular/get')(send), - getReadableStream: require('../files-regular/get-readable-stream')(send), - getPullStream: require('../files-regular/get-pull-stream')(send), - ls: require('../files-regular/ls')(send), - lsReadableStream: require('../files-regular/ls-readable-stream')(send), - lsPullStream: require('../files-regular/ls-pull-stream')(send), - refs: require('../files-regular/refs')(send), - refsReadableStream: require('../files-regular/refs-readable-stream')(send), - refsPullStream: require('../files-regular/refs-pull-stream')(send) + _catAsyncIterator: cat, + get: callbackify.variadic(async (path, options) => { + const output = [] + + for await (const entry of get(path, options)) { + if (entry.content) { + entry.content = Buffer.concat(await all(entry.content)) + } + + output.push(entry) + } + + return output + }), + getReadableStream: streamify.readable(get), + getPullStream: (path, options) => { + return pull( + toPullStream(get(path, options)), + map(file => { + if (file.content) { + file.content = toPullStream(file.content) + } + + return file + }) + ) + }, + _getAsyncIterator: get, + ls: callbackify.variadic((path, options) => collectify(ls)(path, options)), + lsReadableStream: streamify.readable(ls), + lsPullStream: pullify.source(ls), + _lsAsyncIterator: ls, + refs: callbackify.variadic((path, options) => collectify(refs)(path, options)), + refsReadableStream: streamify.readable(refs), + refsPullStream: pullify.source(refs), + _refsAsyncIterator: refs } + + api.refs.local = callbackify.variadic((options) => collectify(refsLocal)(options)) + api.refs.localReadableStream = streamify.readable(refsLocal) + api.refs.localPullStream = pullify.source(refsLocal) + api.refs._localAsyncIterator = refsLocal + + return api } diff --git a/src/files-regular/ls-pull-stream.js b/src/files-regular/ls-pull-stream.js deleted file mode 100644 index d1e238df0..000000000 --- a/src/files-regular/ls-pull-stream.js +++ /dev/null @@ -1,74 +0,0 @@ -'use strict' - -const moduleConfig = require('../utils/module-config') -const values = require('pull-stream/sources/values') -const deferred = require('pull-defer') -const IsIpfs = require('is-ipfs') -const cleanCID = require('../utils/clean-cid') - -module.exports = (arg) => { - const send = moduleConfig(arg) - - return (args, opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - - try { - args = cleanCID(args) - } catch (err) { - if (!IsIpfs.ipfsPath(args)) { - return callback(err) - } - } - - const p = deferred.source() - - send({ path: 'ls', args: args, qs: opts }, (err, results) => { - if (err) { - return callback(err) - } - - let result = results.Objects - if (!result) { - return callback(new Error('expected .Objects in results')) - } - - result = result[0] - if (!result) { - return callback(new Error('expected one array in results.Objects')) - } - - result = result.Links - if (!Array.isArray(result)) { - return callback(new Error('expected one array in results.Objects[0].Links')) - } - - result = result.map((link) => ({ - depth: 1, - name: link.Name, - path: args + '/' + link.Name, - size: link.Size, - hash: link.Hash, - type: typeOf(link) - })) - - p.resolve(values(result)) - }) - - return p - } -} - -function typeOf (link) { - switch (link.Type) { - case 1: - case 5: - return 'dir' - case 2: - return 'file' - default: - return 'unknown' - } -} diff --git a/src/files-regular/ls-readable-stream.js b/src/files-regular/ls-readable-stream.js deleted file mode 100644 index 7ee980e31..000000000 --- a/src/files-regular/ls-readable-stream.js +++ /dev/null @@ -1,72 +0,0 @@ -'use strict' - -const moduleConfig = require('../utils/module-config') -const Stream = require('readable-stream') -const IsIpfs = require('is-ipfs') -const cleanCID = require('../utils/clean-cid') - -module.exports = (arg) => { - const send = moduleConfig(arg) - - return (args, opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - - try { - args = cleanCID(args) - } catch (err) { - if (!IsIpfs.ipfsPath(args)) { - return callback(err) - } - } - - const pt = new Stream.PassThrough({ objectMode: true }) - - send({ path: 'ls', args: args, qs: opts }, (err, results) => { - if (err) { return callback(err) } - - let result = results.Objects - if (!result) { - return callback(new Error('expected .Objects in results')) - } - - result = result[0] - if (!result) { - return callback(new Error('expected one array in results.Objects')) - } - - result = result.Links - if (!Array.isArray(result)) { - return callback(new Error('expected one array in results.Objects[0].Links')) - } - - result = result.map((link) => ({ - depth: 1, - name: link.Name, - path: args + '/' + link.Name, - size: link.Size, - hash: link.Hash, - type: typeOf(link) - })) - - result.forEach((item) => pt.write(item)) - pt.end() - }) - - return pt - } -} - -function typeOf (link) { - switch (link.Type) { - case 1: - case 5: - return 'dir' - case 2: - return 'file' - default: - return 'unknown' - } -} diff --git a/src/files-regular/ls.js b/src/files-regular/ls.js index 8fbc2d16e..0f13f556d 100644 --- a/src/files-regular/ls.js +++ b/src/files-regular/ls.js @@ -1,64 +1,72 @@ 'use strict' -const promisify = require('promisify-es6') const IsIpfs = require('is-ipfs') -const moduleConfig = require('../utils/module-config') const cleanCID = require('../utils/clean-cid') +const configure = require('../lib/configure') -module.exports = (arg) => { - const send = moduleConfig(arg) - - return promisify((args, opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } +module.exports = configure(({ ky }) => { + return async function * ls (path, options) { + options = options || {} try { - args = cleanCID(args) + path = cleanCID(path) } catch (err) { - if (!IsIpfs.ipfsPath(args)) { - return callback(err) + if (!IsIpfs.ipfsPath(path)) { + throw err } } - send({ - path: 'ls', - args: args, - qs: opts - }, (err, results) => { - if (err) { - return callback(err) - } + const searchParams = new URLSearchParams() + searchParams.set('arg', path.toString()) - let result = results.Objects - if (!result) { - return callback(new Error('expected .Objects in results')) - } + if (options.long !== undefined) { + searchParams.set('long', options.long) + } - result = result[0] - if (!result) { - return callback(new Error('expected one array in results.Objects')) - } + if (options.unsorted !== undefined) { + searchParams.set('unsorted', options.unsorted) + } - result = result.Links - if (!Array.isArray(result)) { - return callback(new Error('expected one array in results.Objects[0].Links')) - } + if (options.recursive !== undefined) { + searchParams.set('recursive', options.recursive) + } + + const res = await ky.get('ls', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) + + let result = await res.json() - result = result.map((link) => ({ + result = result.Objects + if (!result) { + throw new Error('expected .Objects in results') + } + + result = result[0] + if (!result) { + throw new Error('expected one array in results.Objects') + } + + result = result.Links + if (!Array.isArray(result)) { + throw new Error('expected one array in results.Objects[0].Links') + } + + for (const link of result) { + yield { name: link.Name, - path: args + '/' + link.Name, + path: path + '/' + link.Name, size: link.Size, hash: link.Hash, type: typeOf(link), depth: link.Depth || 1 - })) - - callback(null, result) - }) - }) -} + } + } + } +}) function typeOf (link) { switch (link.Type) { diff --git a/src/files-regular/refs-local-pull-stream.js b/src/files-regular/refs-local-pull-stream.js deleted file mode 100644 index c4452b116..000000000 --- a/src/files-regular/refs-local-pull-stream.js +++ /dev/null @@ -1,27 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') -const deferred = require('pull-defer') -const moduleConfig = require('../utils/module-config') - -module.exports = (send) => { - send = moduleConfig(send) - - return (opts) => { - opts = opts || {} - - const p = deferred.source() - - send({ path: 'refs/local', qs: opts }, (err, stream) => { - if (err) { return p.resolve(pull.error(err)) } - - p.resolve(pull( - toPull.source(stream), - pull.map(r => ({ ref: r.Ref, err: r.Err })) - )) - }) - - return p - } -} diff --git a/src/files-regular/refs-local-readable-stream.js b/src/files-regular/refs-local-readable-stream.js deleted file mode 100644 index 09fddde1d..000000000 --- a/src/files-regular/refs-local-readable-stream.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict' - -const Stream = require('readable-stream') -const pump = require('pump') -const through = require('through2') - -module.exports = (send) => { - return (opts) => { - opts = opts || {} - - const pt = new Stream.PassThrough({ objectMode: true }) - - send({ path: 'refs/local', qs: opts }, (err, stream) => { - if (err) { return pt.destroy(err) } - - stream.once('error', (err) => pt.destroy(err)) - - pump(stream, through.obj(function (r, enc, cb) { - cb(null, { ref: r.Ref, err: r.Err }) - }), pt) - }) - - return pt - } -} diff --git a/src/files-regular/refs-local.js b/src/files-regular/refs-local.js index 680e51000..efb8d32d2 100644 --- a/src/files-regular/refs-local.js +++ b/src/files-regular/refs-local.js @@ -1,32 +1,22 @@ 'use strict' -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') -const moduleConfig = require('../utils/module-config') +const configure = require('../lib/configure') +const ndjson = require('iterable-ndjson') +const toIterable = require('../lib/stream-to-iterable') +const toCamel = require('../lib/object-to-camel') -module.exports = (arg) => { - const send = moduleConfig(arg) +module.exports = configure(({ ky }) => { + return async function * refsLocal (options) { + options = options || {} - return promisify((opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - - const transform = (res, cb) => { - cb(null, res.map(r => ({ ref: r.Ref, err: r.Err }))) - } + const res = await ky.get('refs/local', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers + }) - const request = { - path: 'refs/local', - qs: opts + for await (const file of ndjson(toIterable(res.body))) { + yield toCamel(file) } - send(request, (err, result) => { - if (err) { - return callback(err) - } - - streamToValueWithTransformer(result, transform, callback) - }) - }) -} + } +}) diff --git a/src/files-regular/refs-pull-stream.js b/src/files-regular/refs-pull-stream.js deleted file mode 100644 index e3c1b113a..000000000 --- a/src/files-regular/refs-pull-stream.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') -const deferred = require('pull-defer') -const moduleConfig = require('../utils/module-config') -const { checkArgs, normalizeOpts } = require('./refs') - -module.exports = (send) => { - send = moduleConfig(send) - - return (args, opts) => { - opts = normalizeOpts(opts) - - const p = deferred.source() - - try { - args = checkArgs(args) - } catch (err) { - return p.end(err) - } - - send({ path: 'refs', args, qs: opts }, (err, stream) => { - if (err) { return p.resolve(pull.error(err)) } - - p.resolve(pull( - toPull.source(stream), - pull.map(r => ({ ref: r.Ref, err: r.Err })) - )) - }) - - return p - } -} diff --git a/src/files-regular/refs-readable-stream.js b/src/files-regular/refs-readable-stream.js deleted file mode 100644 index b5cf69c48..000000000 --- a/src/files-regular/refs-readable-stream.js +++ /dev/null @@ -1,32 +0,0 @@ -'use strict' - -const Stream = require('readable-stream') -const pump = require('pump') -const through = require('through2') -const { checkArgs, normalizeOpts } = require('./refs') - -module.exports = (send) => { - return (args, opts) => { - opts = normalizeOpts(opts) - - const pt = new Stream.PassThrough({ objectMode: true }) - - try { - args = checkArgs(args) - } catch (err) { - return pt.destroy(err) - } - - send({ path: 'refs', args, qs: opts }, (err, stream) => { - if (err) { return pt.destroy(err) } - - stream.once('error', (err) => pt.destroy(err)) - - pump(stream, through.obj(function (r, enc, cb) { - cb(null, { ref: r.Ref, err: r.Err }) - }), pt) - }) - - return pt - } -} diff --git a/src/files-regular/refs.js b/src/files-regular/refs.js index 986a6f6cc..c6136ede5 100644 --- a/src/files-regular/refs.js +++ b/src/files-regular/refs.js @@ -1,75 +1,63 @@ 'use strict' -const IsIpfs = require('is-ipfs') -const promisify = require('promisify-es6') -const streamToValueWithTransformer = require('../utils/stream-to-value-with-transformer') -const moduleConfig = require('../utils/module-config') +const configure = require('../lib/configure') const cleanCID = require('../utils/clean-cid') +const IsIpfs = require('is-ipfs') +const ndjson = require('iterable-ndjson') +const toIterable = require('../lib/stream-to-iterable') +const toCamel = require('../lib/object-to-camel') -module.exports = (arg) => { - const send = moduleConfig(arg) +module.exports = configure(({ ky }) => { + return async function * refs (args, options) { + options = options || {} - const refs = promisify((args, opts, callback) => { - if (typeof (opts) === 'function') { - callback = opts - opts = {} - } - opts = module.exports.normalizeOpts(opts) + const searchParams = new URLSearchParams() - try { - args = module.exports.checkArgs(args) - } catch (err) { - return callback(err) + if (options.format !== undefined) { + searchParams.set('format', options.format) } - const transform = (res, cb) => { - cb(null, res.map(r => ({ ref: r.Ref, err: r.Err }))) + if (options.edges !== undefined) { + searchParams.set('edges', options.edges) } - const request = { - args, - path: 'refs', - qs: opts + if (options.unique !== undefined) { + searchParams.set('unique', options.unique) } - send(request, (err, result) => { - if (err) { - return callback(err) - } - streamToValueWithTransformer(result, transform, callback) - }) - }) - - refs.local = require('./refs-local')(arg) - refs.localReadableStream = require('./refs-local-readable-stream')(arg) - refs.localPullStream = require('./refs-local-pull-stream')(arg) + if (options.recursive !== undefined) { + searchParams.set('recursive', options.recursive) + } - return refs -} + if (options.maxDepth !== undefined) { + searchParams.set('max-depth', options.maxDepth) + } -module.exports.checkArgs = (args) => { - const isArray = Array.isArray(args) - args = isArray ? args : [args] + if (!Array.isArray(args)) { + args = [args] + } - const res = [] - for (let arg of args) { - try { - arg = cleanCID(arg) - } catch (err) { - if (!IsIpfs.ipfsPath(arg)) { - throw err + for (let arg of args) { + try { + arg = cleanCID(arg) + } catch (err) { + if (!IsIpfs.ipfsPath(arg)) { + throw err + } } + + searchParams.append('arg', arg.toString()) } - res.push(arg) - } - return isArray ? res : res[0] -} + const res = await ky.get('refs', { + timeout: options.timeout, + signal: options.signal, + headers: options.headers, + searchParams + }) -module.exports.normalizeOpts = (opts) => { - opts = opts || {} - if (typeof opts.maxDepth === 'number') { - opts['max-depth'] = opts.maxDepth + for await (const file of ndjson(toIterable(res.body))) { + yield toCamel(file) + } } - return opts -} +}) diff --git a/src/utils/tar-stream-to-objects.js b/src/utils/tar-stream-to-objects.js index 724544583..7241b3445 100644 --- a/src/utils/tar-stream-to-objects.js +++ b/src/utils/tar-stream-to-objects.js @@ -1,48 +1,85 @@ 'use strict' -const pump = require('pump') const tar = require('tar-stream') -const ReadableStream = require('readable-stream').Readable +const { EventIterator } = require('event-iterator') -class ObjectsStreams extends ReadableStream { - constructor (options) { - const opts = Object.assign(options || {}, { objectMode: true }) - super(opts) - } +function pipe (reader, writable) { + reader.read() + .then(({ done, value }) => { + if (done) { + writable.end() + + return + } + + if (value) { + const beneathHighWaterMark = writable.write(value) - _read () {} + if (beneathHighWaterMark) { + pipe(reader, writable) + } else { + writable.once('drain', () => { + pipe(reader, writable) + }) + } + } + }, (err) => { + writable.emit('error', err) + }) } /* - Transform a tar stream into a stream of objects: + Transform a tar readable stream into an async iterator of objects: Output format: - { path: 'string', content: Stream } + { path: 'string', content: AsyncIterator } */ -const TarStreamToObjects = (inputStream, callback) => { - const outputStream = new ObjectsStreams() +async function * tarStreamToObjects (inputStream) { const extractStream = tar.extract() + let onEntry + + const tarStream = new EventIterator( + (push, stop, fail) => { + onEntry = (header, stream, next) => { + push({ header, stream }) - extractStream - .on('entry', (header, stream, next) => { - stream.on('end', next) - - if (header.type !== 'directory') { - outputStream.push({ - path: header.name, - content: stream - }) - } else { - outputStream.push({ - path: header.name - }) - stream.resume() + next() } - }) - .on('finish', () => outputStream.push(null)) - pump(inputStream, extractStream) - callback(null, outputStream) + extractStream.addListener('entry', onEntry) + extractStream.addListener('finish', stop) + extractStream.addListener('error', fail) + }, + (push, stop, fail) => { + extractStream.removeListener('entry', onEntry) + extractStream.removeListener('finish', stop) + extractStream.removeListener('error', fail) + extractStream.destroy() + } + ) + + if (inputStream.pipe) { + // node stream + inputStream.pipe(extractStream) + } else if (inputStream.getReader) { + // browser readable stream + pipe(inputStream.getReader(), extractStream) + } else { + throw new Error('Unknown stream type') + } + + for await (const { header, stream } of tarStream) { + if (header.type === 'directory') { + yield { + path: header.name + } + } else { + yield { + path: header.name, + content: stream + } + } + } } -module.exports = TarStreamToObjects +module.exports = tarStreamToObjects diff --git a/test/get.spec.js b/test/get.spec.js index 304875bc7..7c1a63e74 100644 --- a/test/get.spec.js +++ b/test/get.spec.js @@ -59,7 +59,7 @@ describe('.get (specific go-ipfs features)', function () { it('err with out of range compression level', async () => { await expect(ipfs.get(smallFile.cid, { compress: true, - 'compression-level': 10 + compressionLevel: 10 })).to.be.rejectedWith('compression level must be between 1 and 9') }) @@ -83,18 +83,11 @@ describe('.get (specific go-ipfs features)', function () { it('get path containing "+"s', async () => { const cid = 'QmPkmARcqjo5fqK1V1o8cFsuaXxWYsnwCNLJUYS4KeZyff' - let count = 0 const files = await ipfs.get(cid) - files.forEach((file) => { - if (file.path !== cid) { - count++ - expect(file.path).to.contain('+') - - if (count === 2) { - // done() - } - } - }) + expect(files).to.be.an('array').with.lengthOf(3) + expect(files[0]).to.have.property('path', cid) + expect(files[1]).to.have.property('path', `${cid}/c++files`) + expect(files[2]).to.have.property('path', `${cid}/c++files/ti,c64x+mega++mod-pic.txt`) }) })