Skip to content

Commit

Permalink
feat: adds ls streaming methods
Browse files Browse the repository at this point in the history
Implementation of ipfs-inactive/interface-js-ipfs-core#401

License: MIT
Signed-off-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
achingbrain committed Nov 28, 2018
1 parent bc946c3 commit 1b07f58
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 151 deletions.
93 changes: 48 additions & 45 deletions src/cli/ls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
'use strict'

const pull = require('pull-stream/pull')
const onEnd = require('pull-stream/sinks/on-end')
const through = require('pull-stream/throughs/through')
const {
print,
asBoolean
Expand All @@ -21,12 +24,12 @@ module.exports = {
coerce: asBoolean,
describe: 'Use long listing format.'
},
unsorted: {
alias: 'U',
sort: {
alias: 's',
type: 'boolean',
default: false,
coerce: asBoolean,
describe: 'Do not sort; list entries in directory order.'
describe: 'Sort entries by name'
},
'cid-base': {
default: 'base58btc',
Expand All @@ -39,55 +42,55 @@ module.exports = {
path,
ipfs,
long,
unsorted,
sort,
cidBase
} = argv

argv.resolve(
ipfs.files.ls(path || FILE_SEPARATOR, {
long,
unsorted,
cidBase
})
.then(files => {
if (long) {
const table = []
const lengths = {}

files.forEach(link => {
const row = {
name: `${link.name}`,
hash: `${link.hash}`,
size: `${link.size}`
new Promise((resolve, reject) => {
if (sort) {
ipfs.files.ls(path || FILE_SEPARATOR, {
long,
sort,
cidBase
})
.then(files => {
if (long) {
files.forEach(link => {
print(`${link.name}\t${link.hash}\t${link.size}`)
})
} else {
files.forEach(link => print(link.name))
}

Object.keys(row).forEach(key => {
const value = row[key]

lengths[key] = lengths[key] > value.length ? lengths[key] : value.length
})

table.push(row)
})

table.forEach(row => {
let line = ''

Object.keys(row).forEach(key => {
const value = row[key]

line += value.padEnd(lengths[key])
line += '\t'
})

print(line)
resolve()
})

return
}

files.forEach(link => print(link.name))
})
.catch(reject)

return
}

pull(
ipfs.files.lsPullStream(path, {
long,
cidBase
}),
through(file => {
if (long) {
print(`${file.name}\t${file.hash}\t${file.size}`)
} else {
print(file.name)
}
}),
onEnd((error) => {
if (error) {
return reject(error)
}

resolve()
})
)
})
)
}
}
4 changes: 3 additions & 1 deletion src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const unwrappedOperations = {
// These operations are synchronous and manage their own locking
const unwrappedSynchronousOperations = {
readPullStream: require('./read-pull-stream'),
readReadableStream: require('./read-readable-stream')
readReadableStream: require('./read-readable-stream'),
lsPullStream: require('./ls-pull-stream'),
lsReadableStream: require('./ls-readable-stream')
}

const wrap = ({
Expand Down
131 changes: 131 additions & 0 deletions src/core/ls-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
'use strict'

const waterfall = require('async/waterfall')
const UnixFs = require('ipfs-unixfs')
const exporter = require('ipfs-unixfs-exporter')
const {
loadNode,
formatCid,
toMfsPath,
FILE_SEPARATOR,
FILE_TYPES
} = require('./utils')
const pull = require('pull-stream/pull')
const collect = require('pull-stream/sinks/collect')
const asyncMap = require('pull-stream/throughs/async-map')
const filter = require('pull-stream/throughs/filter')
const once = require('pull-stream/sources/once')
const error = require('pull-stream/sources/error')
const defer = require('pull-defer')

const defaultOptions = {
long: false,
cidBase: 'base58btc'
}

module.exports = (context) => {
return function mfsLs (path, options = {}) {
if (typeof path === 'object') {
options = path
path = FILE_SEPARATOR
}

options = Object.assign({}, defaultOptions, options)

options.long = options.l || options.long

const deferred = defer.source()

waterfall([
(cb) => toMfsPath(context, path, cb),
({ mfsPath, depth }, cb) => {
pull(
exporter(mfsPath, context.ipld, {
maxDepth: depth
}),

collect((err, files) => {
if (err) {
return cb(err)
}

if (files.length > 1) {
return cb(new Error(`Path ${path} had ${files.length} roots`))
}

const file = files[0]

if (!file) {
return cb(new Error(`${path} does not exist`))
}

if (file.type !== 'dir') {
return cb(null, once(file))
}

let first = true

return cb(null, pull(
exporter(mfsPath, context.ipld, {
maxDepth: depth + 1
}),
// first item in list is the directory node
filter(() => {
if (first) {
first = false
return false
}

return true
})
))
})
)
},
(source, cb) => {
cb(null,
pull(
source,

// load DAGNodes for each file
asyncMap((file, cb) => {
if (!options.long) {
return cb(null, {
name: file.name,
type: 0,
size: 0,
hash: ''
})
}

loadNode(context, {
cid: file.hash
}, (err, result) => {
if (err) {
return cb(err)
}

const meta = UnixFs.unmarshal(result.node.data)

cb(null, {
name: file.name,
type: FILE_TYPES[meta.type],
hash: formatCid(file.hash, options.cidBase),
size: meta.fileSize() || 0
})
})
})
)
)
}
], (err, source) => {
if (err) {
return deferred.resolve(error(err))
}

deferred.resolve(source)
})

return deferred
}
}
10 changes: 10 additions & 0 deletions src/core/ls-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'use strict'

const lsPullStream = require('./ls-pull-stream')
const toStream = require('pull-stream-to-stream')

module.exports = (context) => {
return function mfsLsReadableStream (path, options = {}) {
return toStream.source(lsPullStream(context)(path, options))
}
}
Loading

0 comments on commit 1b07f58

Please sign in to comment.