Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
test: most tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored and alanshaw committed May 20, 2019
1 parent 812ec2e commit 607623d
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 63 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@
"execa": "^1.0.0",
"form-data": "^2.3.3",
"hat": "0.0.3",
"interface-ipfs-core": "~0.102.0",
"ipfsd-ctl": "~0.42.0",
"libp2p-websocket-star": "~0.10.2",
"ncp": "^2.0.0",
"qs": "^6.5.2",
"rimraf": "^2.6.2",
"interface-ipfs-core": "~0.102.0",
"sinon": "^7.3.1",
"stream-to-promise": "^2.2.0"
},
Expand All @@ -86,7 +86,9 @@
"@hapi/hapi": "^18.3.1",
"@hapi/joi": "^15.0.1",
"async": "^2.6.1",
"async-iterator-all": "0.0.2",
"async-iterator-to-pull-stream": "^1.1.0",
"async-iterator-to-stream": "^1.1.0",
"base32.js": "~0.1.0",
"bignumber.js": "^8.0.2",
"binary-querystring": "~0.1.2",
Expand Down
76 changes: 71 additions & 5 deletions src/core/components/files-mfs.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,75 @@
'use strict'

const mfs = require('ipfs-mfs/core')
const toPullStream = require('async-iterator-to-pull-stream')
const toReadableStream = require('async-iterator-to-stream')
const all = require('async-iterator-all')
const callbackify = require('util').callbackify
const PassThrough = require('stream').PassThrough
const pull = require('pull-stream/pull')
const map = require('pull-stream/throughs/map')

module.exports = self => mfs({
ipld: self._ipld,
repo: self._repo,
repoOwner: self._options.repoOwner
})
const mapLsFile = (options = {}) => {
const long = options.long || options.l

return (file) => {
console.info(file)

return {
hash: long ? file.cid.toBaseEncodedString(options.cidBase) : '',
name: file.name,
type: long ? file.type : 0,
size: long ? file.size || 0 : 0
}
}
}

module.exports = self => {
const methods = mfs({
ipld: self._ipld,
blocks: self._blockService,
datastore: self._repo.root,
repoOwner: self._options.repoOwner
})

return {
cp: callbackify(methods.cp),
flush: callbackify(methods.flush),
ls: callbackify(async (path, options) => {
const files = await all(methods.ls(path, options))

return files.map(mapLsFile(options))
}),
lsReadableStream: (path, options) => {
const stream = toReadableStream.obj(methods.ls(path, options))
const through = new PassThrough({
objectMode: true
})
stream.on('data', (file) => through.emit('data', mapLsFile(options)(file)))
stream.on('error', through.emit.bind(through, 'error'))
stream.on('end', through.emit.bind(through, 'end'))

return through
},
lsPullStream: (path, options) => {
return pull(
toPullStream.source(methods.ls(path, options)),
map(mapLsFile(options))
)
},
mkdir: callbackify(methods.mkdir),
mv: callbackify(methods.mv),
read: callbackify(async (path, options) => {
return Buffer.concat(await all(methods.read(path, options)))
}),
readPullStream: (path, options) => {
return toReadableStream(methods.read(path, options))
},
readReadableStream: (path, options) => {
return toPullStream.source(methods.read(path, options))
},
rm: callbackify(methods.rm),
stat: callbackify(methods.stat),
write: callbackify(methods.write)
}
}
54 changes: 39 additions & 15 deletions src/core/components/files-regular/add-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const importer = require('ipfs-unixfs-importer')
<<<<<<< HEAD
const kindOf = require('kind-of')
const CID = require('cids')
const pullValues = require('pull-stream/sources/values')
Expand All @@ -12,6 +13,15 @@ const toPull = require('stream-to-pull-stream')
const waterfall = require('async/waterfall')
const isStream = require('is-stream')
const { isSource } = require('is-pull-stream')
=======
const toAsyncIterator = require('pull-stream-to-async-iterator')
const toPullStream = require('async-iterator-to-pull-stream')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const waterfall = require('async/waterfall')
const isStream = require('is-stream')
const isSource = require('is-pull-stream').isSource
>>>>>>> test: most tests passing
const { parseChunkerString } = require('./utils')
const streamFromFileReader = require('ipfs-utils/src/streams/stream-from-filereader')
const { supportsFileReader } = require('ipfs-utils/src/supports')
Expand All @@ -23,30 +33,28 @@ function noop () {}
function prepareFile (file, self, opts, callback) {
opts = opts || {}

let cid = new CID(file.multihash)

if (opts.cidVersion === 1) {
cid = cid.toV1()
}
let cid = file.cid

waterfall([
(cb) => opts.onlyHash
? cb(null, file)
: self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb),
: self.object.get(file.cid, Object.assign({}, opts, { preload: false }), cb),
(node, cb) => {
const b58Hash = cid.toBaseEncodedString()
if (opts.cidVersion === 1) {
cid = cid.toV1()
}

const b58Hash = cid.toBaseEncodedString()
let size = node.size

if (Buffer.isBuffer(node)) {
size = node.length
}

cb(null, {
path: opts.wrapWithDirectory
? file.path.substring(WRAPPER.length)
: (file.path || b58Hash),
path: file.path === undefined ? b58Hash : (file.path || ''),
hash: b58Hash,
// multihash: b58Hash,
size
})
}
Expand Down Expand Up @@ -90,16 +98,16 @@ function normalizeContent (content, opts) {
throw new Error('Must provide a path when wrapping with a directory')
}

if (opts.wrapWithDirectory) {
data.path = WRAPPER + data.path
}
//if (opts.wrapWithDirectory) {
// data.path = WRAPPER + data.path
//}

return data
})
}

function preloadFile (file, self, opts) {
const isRootFile = opts.wrapWithDirectory
const isRootFile = !file.path || opts.wrapWithDirectory
? file.path === ''
: !file.path.includes('/')

Expand Down Expand Up @@ -140,7 +148,10 @@ module.exports = function (self) {
shardSplitThreshold: self._options.EXPERIMENTAL.sharding
? 1000
: Infinity
}, options, chunkerOptions)
}, options, {
...chunkerOptions.chunkerOptions,
chunker: chunkerOptions.chunker
})

// CID v0 is for multihashes encoded with sha2-256
if (opts.hashAlg && opts.cidVersion !== 1) {
Expand All @@ -157,12 +168,25 @@ module.exports = function (self) {

opts.progress = progress
return pull(
<<<<<<< HEAD
pullMap(content => normalizeContent(content, opts)),
pullFlatten(),
importer(self._ipld, opts),
pullAsyncMap((file, cb) => prepareFile(file, self, opts, cb)),
pullMap(file => preloadFile(file, self, opts)),
pullAsyncMap((file, cb) => pinFile(file, self, opts, cb))
=======
pull.map(content => normalizeContent(content, opts)),
pull.flatten(),
pull.map(file => ({
path: file.path ? file.path : undefined,
content: file.content ? toAsyncIterator(file.content) : undefined
})),
toPullStream.transform(source => importer(source, self._ipld, opts)),
pull.asyncMap((file, cb) => prepareFile(file, self, opts, cb)),
pull.map(file => preloadFile(file, self, opts)),
pull.asyncMap((file, cb) => pinFile(file, self, opts, cb))
>>>>>>> test: most tests passing
)
}
}
27 changes: 9 additions & 18 deletions src/core/components/files-regular/cat-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const pull = require('pull-stream')
const deferred = require('pull-defer')
const toPullStream = require('async-iterator-to-pull-stream')
const { normalizePath } = require('./utils')

module.exports = function (self) {
Expand All @@ -15,40 +15,31 @@ module.exports = function (self) {

ipfsPath = normalizePath(ipfsPath)
const pathComponents = ipfsPath.split('/')
const fileNameOrHash = pathComponents[pathComponents.length - 1]

if (options.preload !== false) {
self._preload(pathComponents[0])
}

const d = deferred.source()

pull(
exporter(ipfsPath, self._ipld, options),
pull.filter(file => file.path === fileNameOrHash),
pull.take(1),
pull.collect((err, files) => {
if (err) {
return d.abort(err)
exporter(ipfsPath, self._ipld, options)
.then(file => {
if (!file.unixfs) {
return d.abort(new Error('this dag node is not a UnixFS node'))
}

if (!files.length) {
return d.abort(new Error('No such file'))
}

const file = files[0]

if (!file.content && file.type === 'dir') {
if (file.unixfs.type.includes('dir')) {
return d.abort(new Error('this dag node is a directory'))
}

if (!file.content) {
return d.abort(new Error('this dag node has no content'))
}

d.resolve(file.content)
d.resolve(toPullStream.source(file.content(options)))
}, err => {
d.abort(err)
})
)

return d
}
Expand Down
17 changes: 9 additions & 8 deletions src/core/components/files-regular/get-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const pull = require('pull-stream')
const toPullStream = require('async-iterator-to-pull-stream')
const errCode = require('err-code')
const { normalizePath } = require('./utils')
const pull = require('pull-stream/pull')
const map = require('pull-stream/throughs/map')
const { normalizePath, mapFile } = require('./utils')

module.exports = function (self) {
return (ipfsPath, options) => {
Expand All @@ -22,12 +24,11 @@ module.exports = function (self) {
}

return pull(
exporter(ipfsPath, self._ipld, options),
pull.map(file => {
file.hash = file.cid.toString()
delete file.cid
return file
})
toPullStream.source(exporter.recursive(ipfsPath, self._ipld, options)),
map(mapFile({
...options,
includeContent: true
}))
)
}
}
61 changes: 45 additions & 16 deletions src/core/components/files-regular/ls-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
'use strict'

const exporter = require('ipfs-unixfs-exporter')
const pull = require('pull-stream')
const { normalizePath } = require('./utils')
const deferred = require('pull-defer')
const pull = require('pull-stream/pull')
const once = require('pull-stream/sources/once')
const map = require('pull-stream/throughs/map')
const filter = require('pull-stream/throughs/filter')
const errCode = require('err-code')
const toPullStream = require('async-iterator-to-pull-stream')
const { normalizePath, mapFile } = require('./utils')

module.exports = function (self) {
return function (ipfsPath, options) {
Expand All @@ -11,25 +17,48 @@ module.exports = function (self) {
const path = normalizePath(ipfsPath)
const recursive = options.recursive
const pathComponents = path.split('/')
const pathDepth = pathComponents.length
const maxDepth = recursive ? global.Infinity : pathDepth
options.maxDepth = options.maxDepth || maxDepth

if (options.preload !== false) {
self._preload(pathComponents[0])
}

return pull(
exporter(ipfsPath, self._ipld, options),
pull.filter(node =>
recursive ? node.depth >= pathDepth : node.depth === pathDepth
),
pull.map(node => {
node.hash = node.cid.toString()
delete node.cid
delete node.content
return node
const d = deferred.source()

exporter(ipfsPath, self._ipld, options)
.then(file => {
if (!file.unixfs) {
return d.abort(errCode(new Error('dag node was not a UnixFS node'), 'ENOTUNIXFS'))
}

if (file.unixfs.type === 'file') {
return d.resolve(once(mapFile(options)(file)))
}

if (file.unixfs.type.includes('dir')) {
if (recursive) {
return d.resolve(pull(
toPullStream.source(exporter.recursive(file.cid, self._ipld, options)),
filter(child => file.cid.toBaseEncodedString() !== child.cid.toBaseEncodedString()),
map(mapFile(options))
))
}

return d.resolve(pull(
toPullStream.source(file.content()),
map(mapFile(options)),
map((file) => {
file.depth--

return file
})
))
}

d.abort(errCode(new Error(`Unknown UnixFS type ${file.unixfs.type}`), 'EUNKNOWNUNIXFSTYPE'))
}, err => {
d.abort(err)
})
)

return d
}
}
Loading

0 comments on commit 607623d

Please sign in to comment.