Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
refactor: modularise files (#1772)
Browse files Browse the repository at this point in the history
This is basically a mechanical change to separate `files-regular.js` into multiple modules - easier to read, debug and maintain.

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw authored Dec 12, 2018
1 parent 07e6c00 commit c5e5c07
Show file tree
Hide file tree
Showing 19 changed files with 703 additions and 659 deletions.
511 changes: 0 additions & 511 deletions src/core/components/files-regular.js

This file was deleted.

156 changes: 156 additions & 0 deletions src/core/components/files-regular/add-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
'use strict'

const { importer } = require('ipfs-unixfs-engine')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const waterfall = require('async/waterfall')
const isStream = require('is-stream')
const isSource = require('is-pull-stream').isSource
const CID = require('cids')
const { parseChunkerString } = require('./utils')

const WRAPPER = 'wrapper/'

function noop () {}

function prepareFile (self, opts, file, callback) {
opts = opts || {}

let cid = new CID(file.multihash)

if (opts.cidVersion === 1) {
cid = cid.toV1()
}

waterfall([
(cb) => opts.onlyHash
? cb(null, file)
: self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb),
(node, cb) => {
const b58Hash = cid.toBaseEncodedString()

let size = node.size

if (Buffer.isBuffer(node)) {
size = node.length
}

cb(null, {
path: opts.wrapWithDirectory
? file.path.substring(WRAPPER.length)
: (file.path || b58Hash),
hash: b58Hash,
size
})
}
], callback)
}

function normalizeContent (opts, content) {
if (!Array.isArray(content)) {
content = [content]
}

return content.map((data) => {
// Buffer input
if (Buffer.isBuffer(data)) {
data = { path: '', content: pull.values([data]) }
}

// Readable stream input
if (isStream.readable(data)) {
data = { path: '', content: toPull.source(data) }
}

if (isSource(data)) {
data = { path: '', content: data }
}

if (data && data.content && typeof data.content !== 'function') {
if (Buffer.isBuffer(data.content)) {
data.content = pull.values([data.content])
}

if (isStream.readable(data.content)) {
data.content = toPull.source(data.content)
}
}

if (opts.wrapWithDirectory && !data.path) {
throw new Error('Must provide a path when wrapping with a directory')
}

if (opts.wrapWithDirectory) {
data.path = WRAPPER + data.path
}

return data
})
}

function preloadFile (self, opts, file) {
const isRootFile = opts.wrapWithDirectory
? file.path === ''
: !file.path.includes('/')

const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false

if (shouldPreload) {
self._preload(file.hash)
}

return file
}

function pinFile (self, opts, file, cb) {
// Pin a file if it is the root dir of a recursive add or the single file
// of a direct add.
const pin = 'pin' in opts ? opts.pin : true
const isRootDir = !file.path.includes('/')
const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg
if (shouldPin) {
return self.pin.add(file.hash, { preload: false }, err => cb(err, file))
} else {
cb(null, file)
}
}

module.exports = function (self) {
// Internal add func that gets used by all add funcs
return function addPullStream (options = {}) {
let chunkerOptions
try {
chunkerOptions = parseChunkerString(options.chunker)
} catch (err) {
return pull.map(() => { throw err })
}
const opts = Object.assign({}, {
shardSplitThreshold: self._options.EXPERIMENTAL.sharding
? 1000
: Infinity
}, options, chunkerOptions)

// CID v0 is for multihashes encoded with sha2-256
if (opts.hashAlg && opts.cidVersion !== 1) {
opts.cidVersion = 1
}

let total = 0

const prog = opts.progress || noop
const progress = (bytes) => {
total += bytes
prog(total)
}

opts.progress = progress
return pull(
pull.map(normalizeContent.bind(null, opts)),
pull.flatten(),
importer(self._ipld, opts),
pull.asyncMap(prepareFile.bind(null, self, opts)),
pull.map(preloadFile.bind(null, self, opts)),
pull.asyncMap(pinFile.bind(null, self, opts))
)
}
}
53 changes: 53 additions & 0 deletions src/core/components/files-regular/add-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const pull = require('pull-stream')
const pushable = require('pull-pushable')
const Duplex = require('readable-stream').Duplex

class AddHelper extends Duplex {
constructor (pullStream, push, options) {
super(Object.assign({ objectMode: true }, options))
this._pullStream = pullStream
this._pushable = push
this._waitingPullFlush = []
}

_read () {
this._pullStream(null, (end, data) => {
while (this._waitingPullFlush.length) {
const cb = this._waitingPullFlush.shift()
cb()
}
if (end) {
if (end instanceof Error) {
this.emit('error', end)
}
} else {
this.push(data)
}
})
}

_write (chunk, encoding, callback) {
this._waitingPullFlush.push(callback)
this._pushable.push(chunk)
}
}

module.exports = function (self) {
return (options) => {
options = options || {}

const p = pushable()
const s = pull(
p,
self.addPullStream(options)
)

const retStream = new AddHelper(s, p)

retStream.once('finish', () => p.end())

return retStream
}
}
62 changes: 62 additions & 0 deletions src/core/components/files-regular/add.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict'

const promisify = require('promisify-es6')
const pull = require('pull-stream')
const sort = require('pull-sort')
const isStream = require('is-stream')
const isSource = require('is-pull-stream').isSource
const isString = require('lodash/isString')

module.exports = function (self) {
const add = promisify((data, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

// Buffer, pull stream or Node.js stream
const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj)
// An object like { content?, path? }, where content isBufferOrStream and path isString
const isContentObject = obj => {
if (typeof obj !== 'object') return false
// path is optional if content is present
if (obj.content) return isBufferOrStream(obj.content)
// path must be a non-empty string if no content
return Boolean(obj.path) && isString(obj.path)
}
// An input atom: a buffer, stream or content object
const isInput = obj => isBufferOrStream(obj) || isContentObject(obj)
// All is ok if data isInput or data is an array of isInput
const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))

if (!ok) {
return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects'))
}

pull(
pull.values([data]),
self.addPullStream(options),
sort((a, b) => {
if (a.path < b.path) return 1
if (a.path > b.path) return -1
return 0
}),
pull.collect(callback)
)
})

return function () {
const args = Array.from(arguments)

// If we .add(<pull stream>), then promisify thinks the pull stream
// is a callback! Add an empty options object in this case so that a
// promise is returned.
if (args.length === 1 && isSource(args[0])) {
args.push({})
}

return add.apply(null, args)
}
}
56 changes: 56 additions & 0 deletions src/core/components/files-regular/cat-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'

const { exporter } = require('ipfs-unixfs-engine')
const pull = require('pull-stream')
const deferred = require('pull-defer')
const { normalizePath } = require('./utils')

module.exports = function (self) {
return function catPullStream (ipfsPath, options) {
if (typeof ipfsPath === 'function') {
throw new Error('You must supply an ipfsPath')
}

options = options || {}

ipfsPath = normalizePath(ipfsPath)
const pathComponents = ipfsPath.split('/')
const restPath = normalizePath(pathComponents.slice(1).join('/'))
const filterFile = (file) => (restPath && file.path === restPath) || (file.path === ipfsPath)

if (options.preload !== false) {
self._preload(pathComponents[0])
}

const d = deferred.source()

pull(
exporter(ipfsPath, self._ipld, options),
pull.filter(filterFile),
pull.take(1),
pull.collect((err, files) => {
if (err) {
return d.abort(err)
}

if (!files.length) {
return d.abort(new Error('No such file'))
}

const file = files[0]

if (!file.content && file.type === 'dir') {
return d.abort(new Error('this dag node is a directory'))
}

if (!file.content) {
return d.abort(new Error('this dag node has no content'))
}

d.resolve(file.content)
})
)

return d
}
}
7 changes: 7 additions & 0 deletions src/core/components/files-regular/cat-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
'use strict'

const toStream = require('pull-stream-to-stream')

module.exports = function (self) {
return (ipfsPath, options) => toStream.source(self.catPullStream(ipfsPath, options))
}
21 changes: 21 additions & 0 deletions src/core/components/files-regular/cat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict'

const promisify = require('promisify-es6')
const pull = require('pull-stream')

module.exports = function (self) {
return promisify((ipfsPath, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

pull(
self.catPullStream(ipfsPath, options),
pull.collect((err, buffers) => {
if (err) { return callback(err) }
callback(null, Buffer.concat(buffers))
})
)
})
}
26 changes: 26 additions & 0 deletions src/core/components/files-regular/get-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict'

const { exporter } = require('ipfs-unixfs-engine')
const pull = require('pull-stream')
const errCode = require('err-code')
const { normalizePath } = require('./utils')

module.exports = function (self) {
return (ipfsPath, options) => {
options = options || {}

if (options.preload !== false) {
let pathComponents

try {
pathComponents = normalizePath(ipfsPath).split('/')
} catch (err) {
return pull.error(errCode(err, 'ERR_INVALID_PATH'))
}

self._preload(pathComponents[0])
}

return exporter(ipfsPath, self._ipld, options)
}
}
Loading

0 comments on commit c5e5c07

Please sign in to comment.