Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

refactor: modularise files #1772

Merged
merged 5 commits into from
Dec 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
511 changes: 0 additions & 511 deletions src/core/components/files-regular.js

This file was deleted.

156 changes: 156 additions & 0 deletions src/core/components/files-regular/add-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
'use strict'

const { importer } = require('ipfs-unixfs-engine')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const waterfall = require('async/waterfall')
const isStream = require('is-stream')
const isSource = require('is-pull-stream').isSource
const CID = require('cids')
const { parseChunkerString } = require('./utils')

const WRAPPER = 'wrapper/'

function noop () {}

function prepareFile (self, opts, file, callback) {
opts = opts || {}

let cid = new CID(file.multihash)

if (opts.cidVersion === 1) {
cid = cid.toV1()
}

waterfall([
(cb) => opts.onlyHash
? cb(null, file)
: self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb),
(node, cb) => {
const b58Hash = cid.toBaseEncodedString()

let size = node.size

if (Buffer.isBuffer(node)) {
size = node.length
}

cb(null, {
path: opts.wrapWithDirectory
? file.path.substring(WRAPPER.length)
: (file.path || b58Hash),
hash: b58Hash,
size
})
}
], callback)
}

function normalizeContent (opts, content) {
if (!Array.isArray(content)) {
content = [content]
}

return content.map((data) => {
// Buffer input
if (Buffer.isBuffer(data)) {
data = { path: '', content: pull.values([data]) }
}

// Readable stream input
if (isStream.readable(data)) {
data = { path: '', content: toPull.source(data) }
}

if (isSource(data)) {
data = { path: '', content: data }
}

if (data && data.content && typeof data.content !== 'function') {
if (Buffer.isBuffer(data.content)) {
data.content = pull.values([data.content])
}

if (isStream.readable(data.content)) {
data.content = toPull.source(data.content)
}
}

if (opts.wrapWithDirectory && !data.path) {
throw new Error('Must provide a path when wrapping with a directory')
}

if (opts.wrapWithDirectory) {
data.path = WRAPPER + data.path
}

return data
})
}

function preloadFile (self, opts, file) {
const isRootFile = opts.wrapWithDirectory
? file.path === ''
: !file.path.includes('/')

const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false

if (shouldPreload) {
self._preload(file.hash)
}

return file
}

function pinFile (self, opts, file, cb) {
// Pin a file if it is the root dir of a recursive add or the single file
// of a direct add.
const pin = 'pin' in opts ? opts.pin : true
const isRootDir = !file.path.includes('/')
const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg
if (shouldPin) {
return self.pin.add(file.hash, { preload: false }, err => cb(err, file))
} else {
cb(null, file)
}
}

module.exports = function (self) {
// Internal add func that gets used by all add funcs
return function addPullStream (options = {}) {
let chunkerOptions
try {
chunkerOptions = parseChunkerString(options.chunker)
} catch (err) {
return pull.map(() => { throw err })
}
const opts = Object.assign({}, {
shardSplitThreshold: self._options.EXPERIMENTAL.sharding
? 1000
: Infinity
}, options, chunkerOptions)

// CID v0 is for multihashes encoded with sha2-256
if (opts.hashAlg && opts.cidVersion !== 1) {
opts.cidVersion = 1
}

let total = 0

const prog = opts.progress || noop
const progress = (bytes) => {
total += bytes
prog(total)
}

opts.progress = progress
return pull(
pull.map(normalizeContent.bind(null, opts)),
pull.flatten(),
importer(self._ipld, opts),
pull.asyncMap(prepareFile.bind(null, self, opts)),
pull.map(preloadFile.bind(null, self, opts)),
pull.asyncMap(pinFile.bind(null, self, opts))
)
}
}
53 changes: 53 additions & 0 deletions src/core/components/files-regular/add-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const pull = require('pull-stream')
const pushable = require('pull-pushable')
const Duplex = require('readable-stream').Duplex

class AddHelper extends Duplex {
constructor (pullStream, push, options) {
super(Object.assign({ objectMode: true }, options))
this._pullStream = pullStream
this._pushable = push
this._waitingPullFlush = []
}

_read () {
this._pullStream(null, (end, data) => {
while (this._waitingPullFlush.length) {
const cb = this._waitingPullFlush.shift()
cb()
}
if (end) {
if (end instanceof Error) {
this.emit('error', end)
}
} else {
this.push(data)
}
})
}

_write (chunk, encoding, callback) {
this._waitingPullFlush.push(callback)
this._pushable.push(chunk)
}
}

module.exports = function (self) {
return (options) => {
options = options || {}

const p = pushable()
const s = pull(
p,
self.addPullStream(options)
)

const retStream = new AddHelper(s, p)

retStream.once('finish', () => p.end())

return retStream
}
}
62 changes: 62 additions & 0 deletions src/core/components/files-regular/add.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict'

const promisify = require('promisify-es6')
const pull = require('pull-stream')
const sort = require('pull-sort')
const isStream = require('is-stream')
const isSource = require('is-pull-stream').isSource
const isString = require('lodash/isString')

module.exports = function (self) {
const add = promisify((data, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

// Buffer, pull stream or Node.js stream
const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj)
// An object like { content?, path? }, where content isBufferOrStream and path isString
const isContentObject = obj => {
if (typeof obj !== 'object') return false
// path is optional if content is present
if (obj.content) return isBufferOrStream(obj.content)
// path must be a non-empty string if no content
return Boolean(obj.path) && isString(obj.path)
}
// An input atom: a buffer, stream or content object
const isInput = obj => isBufferOrStream(obj) || isContentObject(obj)
// All is ok if data isInput or data is an array of isInput
const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))

if (!ok) {
return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects'))
}

pull(
pull.values([data]),
self.addPullStream(options),
sort((a, b) => {
if (a.path < b.path) return 1
if (a.path > b.path) return -1
return 0
}),
pull.collect(callback)
)
})

return function () {
const args = Array.from(arguments)

// If we .add(<pull stream>), then promisify thinks the pull stream
// is a callback! Add an empty options object in this case so that a
// promise is returned.
if (args.length === 1 && isSource(args[0])) {
args.push({})
}

return add.apply(null, args)
}
}
56 changes: 56 additions & 0 deletions src/core/components/files-regular/cat-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'

const { exporter } = require('ipfs-unixfs-engine')
const pull = require('pull-stream')
const deferred = require('pull-defer')
const { normalizePath } = require('./utils')

module.exports = function (self) {
return function catPullStream (ipfsPath, options) {
if (typeof ipfsPath === 'function') {
throw new Error('You must supply an ipfsPath')
}

options = options || {}

ipfsPath = normalizePath(ipfsPath)
const pathComponents = ipfsPath.split('/')
const restPath = normalizePath(pathComponents.slice(1).join('/'))
const filterFile = (file) => (restPath && file.path === restPath) || (file.path === ipfsPath)

if (options.preload !== false) {
self._preload(pathComponents[0])
}

const d = deferred.source()

pull(
exporter(ipfsPath, self._ipld, options),
pull.filter(filterFile),
pull.take(1),
pull.collect((err, files) => {
if (err) {
return d.abort(err)
}

if (!files.length) {
return d.abort(new Error('No such file'))
}

const file = files[0]

if (!file.content && file.type === 'dir') {
return d.abort(new Error('this dag node is a directory'))
}

if (!file.content) {
return d.abort(new Error('this dag node has no content'))
}

d.resolve(file.content)
})
)

return d
}
}
7 changes: 7 additions & 0 deletions src/core/components/files-regular/cat-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
'use strict'

const toStream = require('pull-stream-to-stream')

module.exports = function (self) {
return (ipfsPath, options) => toStream.source(self.catPullStream(ipfsPath, options))
}
21 changes: 21 additions & 0 deletions src/core/components/files-regular/cat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict'

const promisify = require('promisify-es6')
const pull = require('pull-stream')

module.exports = function (self) {
return promisify((ipfsPath, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

pull(
self.catPullStream(ipfsPath, options),
pull.collect((err, buffers) => {
if (err) { return callback(err) }
callback(null, Buffer.concat(buffers))
})
)
})
}
26 changes: 26 additions & 0 deletions src/core/components/files-regular/get-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict'

const { exporter } = require('ipfs-unixfs-engine')
const pull = require('pull-stream')
const errCode = require('err-code')
const { normalizePath } = require('./utils')

module.exports = function (self) {
return (ipfsPath, options) => {
options = options || {}

if (options.preload !== false) {
let pathComponents

try {
pathComponents = normalizePath(ipfsPath).split('/')
} catch (err) {
return pull.error(errCode(err, 'ERR_INVALID_PATH'))
}

self._preload(pathComponents[0])
}

return exporter(ipfsPath, self._ipld, options)
}
}
Loading