Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
chore: converts remaining files api to async iterators (#1124)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Oct 15, 2019
1 parent 191f414 commit d087b72
Show file tree
Hide file tree
Showing 16 changed files with 274 additions and 568 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"detect-node": "^2.0.4",
"end-of-stream": "^1.4.1",
"err-code": "^2.0.0",
"event-iterator": "^1.2.0",
"explain-error": "^1.0.4",
"flatmap": "0.0.3",
"form-data": "^2.5.1",
Expand Down
44 changes: 0 additions & 44 deletions src/files-regular/get-pull-stream.js

This file was deleted.

34 changes: 0 additions & 34 deletions src/files-regular/get-readable-stream.js

This file was deleted.

70 changes: 33 additions & 37 deletions src/files-regular/get.js
Original file line number Diff line number Diff line change
@@ -1,52 +1,48 @@
'use strict'

const promisify = require('promisify-es6')
const configure = require('../lib/configure')
const tarStreamToObjects = require('../utils/tar-stream-to-objects')
const IsIpfs = require('is-ipfs')
const cleanCID = require('../utils/clean-cid')
const TarStreamToObjects = require('../utils/tar-stream-to-objects')
const concat = require('concat-stream')
const through = require('through2')
const v = require('is-ipfs')

module.exports = (send) => {
return promisify((path, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return async function * get (path, options) {
options = options || {}

try {
path = cleanCID(path)
} catch (err) {
if (!v.ipfsPath(path)) {
return callback(err)
if (!IsIpfs.ipfsPath(path)) {
throw err
}
}

const request = { path: 'get', args: path, qs: opts }
const searchParams = new URLSearchParams()
searchParams.set('arg', path.toString())

// Convert the response stream to TarStream objects
send.andTransform(request, TarStreamToObjects, (err, stream) => {
if (err) { return callback(err) }
if (options.compress !== undefined) {
searchParams.set('compress', options.compress)
}

const files = []
if (options.compressionLevel !== undefined) {
searchParams.set('compression-level', options.compressionLevel)
}

stream.pipe(through.obj((file, enc, next) => {
if (file.content) {
file.content.pipe(concat((content) => {
files.push({ path: file.path, content: content })
}))
} else {
files.push(file)
}
next()
}, () => callback(null, files)))
if (options.offset) {
searchParams.set('offset', options.offset)
}

if (options.length) {
searchParams.set('length', options.length)
}

const res = await ky.get('get', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})
})
}

yield * tarStreamToObjects(res.body)
}
})
83 changes: 59 additions & 24 deletions src/files-regular/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
'use strict'

const nodeify = require('promise-nodeify')
const moduleConfig = require('../utils/module-config')
const callbackify = require('callbackify')
const all = require('async-iterator-all')
const { concatify, collectify, pullify, streamify } = require('../lib/converters')
const toPullStream = require('async-iterator-to-pull-stream')
const pull = require('pull-stream/pull')
const map = require('pull-stream/throughs/map')

module.exports = (arg) => {
const send = moduleConfig(arg)
const add = require('../add')(arg)
const addFromFs = require('../add-from-fs')(arg)
const addFromURL = require('../add-from-url')(arg)
const cat = require('../cat')(arg)
module.exports = (config) => {
const add = require('../add')(config)
const addFromFs = require('../add-from-fs')(config)
const addFromURL = require('../add-from-url')(config)
const cat = require('../cat')(config)
const get = require('./get')(config)
const ls = require('./ls')(config)
const refs = require('./refs')(config)
const refsLocal = require('./refs-local')(config)

return {
const api = {
add: (input, options, callback) => {
if (typeof options === 'function') {
callback = options
Expand Down Expand Up @@ -43,23 +50,51 @@ module.exports = (arg) => {
return nodeify(collectify(add)(input, options), callback)
},
_addAsyncIterator: add,
cat: (path, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
return nodeify(concatify(cat)(path, options), callback)
},
cat: callbackify.variadic((path, options) => concatify(cat)(path, options)),
catReadableStream: streamify.readable(cat),
catPullStream: pullify.source(cat),
get: require('../files-regular/get')(send),
getReadableStream: require('../files-regular/get-readable-stream')(send),
getPullStream: require('../files-regular/get-pull-stream')(send),
ls: require('../files-regular/ls')(send),
lsReadableStream: require('../files-regular/ls-readable-stream')(send),
lsPullStream: require('../files-regular/ls-pull-stream')(send),
refs: require('../files-regular/refs')(send),
refsReadableStream: require('../files-regular/refs-readable-stream')(send),
refsPullStream: require('../files-regular/refs-pull-stream')(send)
_catAsyncIterator: cat,
get: callbackify.variadic(async (path, options) => {
const output = []

for await (const entry of get(path, options)) {
if (entry.content) {
entry.content = Buffer.concat(await all(entry.content))
}

output.push(entry)
}

return output
}),
getReadableStream: streamify.readable(get),
getPullStream: (path, options) => {
return pull(
toPullStream(get(path, options)),
map(file => {
if (file.content) {
file.content = toPullStream(file.content)
}

return file
})
)
},
_getAsyncIterator: get,
ls: callbackify.variadic((path, options) => collectify(ls)(path, options)),
lsReadableStream: streamify.readable(ls),
lsPullStream: pullify.source(ls),
_lsAsyncIterator: ls,
refs: callbackify.variadic((path, options) => collectify(refs)(path, options)),
refsReadableStream: streamify.readable(refs),
refsPullStream: pullify.source(refs),
_refsAsyncIterator: refs
}

api.refs.local = callbackify.variadic((options) => collectify(refsLocal)(options))
api.refs.localReadableStream = streamify.readable(refsLocal)
api.refs.localPullStream = pullify.source(refsLocal)
api.refs._localAsyncIterator = refsLocal

return api
}
74 changes: 0 additions & 74 deletions src/files-regular/ls-pull-stream.js

This file was deleted.

Loading

0 comments on commit d087b72

Please sign in to comment.