Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

refactor: convert block API to async await #1153

Merged
merged 5 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"stream": "readable-stream",
"ky-universal": "ky/umd",
"./src/add/form-data.js": "./src/add/form-data.browser.js",
"./src/add-from-fs/index.js": "./src/add-from-fs/index.browser.js"
"./src/add-from-fs/index.js": "./src/add-from-fs/index.browser.js",
"./src/lib/buffer-to-form-data.js": "./src/lib/buffer-to-form-data.browser.js"
},
"repository": "github:ipfs/js-ipfs-http-client",
"scripts": {
Expand Down
71 changes: 17 additions & 54 deletions src/block/get.js
Original file line number Diff line number Diff line change
@@ -1,62 +1,25 @@
'use strict'

const promisify = require('promisify-es6')
const Block = require('ipfs-block')
const CID = require('cids')
const streamToValue = require('../utils/stream-to-value')
const { Buffer } = require('buffer')
const configure = require('../lib/configure')

module.exports = (send) => {
return promisify((args, opts, callback) => {
if (typeof opts === 'function') {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return async (cid, options) => {
cid = new CID(cid)
options = options || {}

// TODO this needs to be adjusted with the new go-ipfs http-api
let cid
try {
if (CID.isCID(args)) {
cid = args
args = cid.toBaseEncodedString()
} else if (Buffer.isBuffer(args)) {
cid = new CID(args)
args = cid.toBaseEncodedString()
} else if (typeof args === 'string') {
cid = new CID(args)
} else {
return callback(new Error('invalid argument'))
}
} catch (err) {
return callback(err)
}
const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${cid}`)

// Transform the response from Buffer or a Stream to a Block
const transform = (res, callback) => {
if (Buffer.isBuffer(res)) {
callback(null, new Block(res, cid))
// For empty blocks, concat-stream can't infer the encoding so we are
// passed back an empty array
} else if (Array.isArray(res) && res.length === 0) {
callback(null, new Block(Buffer.alloc(0), cid))
} else {
streamToValue(res, (err, data) => {
if (err) {
return callback(err)
}
// For empty blocks, concat-stream can't infer the encoding so we are
// passed back an empty array
if (!data.length) data = Buffer.alloc(0)
callback(null, new Block(data, cid))
})
}
}
const data = await ky.get('block/get', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
}).arrayBuffer()

const request = {
path: 'block/get',
args: args,
qs: opts
}

send.andTransform(request, transform, callback)
})
}
return new Block(Buffer.from(data), cid)
}
})
11 changes: 5 additions & 6 deletions src/block/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
'use strict'

const nodeify = require('promise-nodeify')
const moduleConfig = require('../utils/module-config')
const callbackify = require('callbackify')
const { collectify } = require('../lib/converters')

module.exports = (arg, config) => {
const send = moduleConfig(arg)
module.exports = config => {
const rm = require('./rm-async-iterator')(config)

return {
get: require('./get')(send),
stat: require('./stat')(send),
put: require('./put')(send),
get: callbackify.variadic(require('./get')(config)),
stat: callbackify.variadic(require('./stat')(config)),
put: callbackify.variadic(require('./put')(config)),
rm: (input, options, callback) => {
if (typeof options === 'function') {
callback = options
Expand Down
110 changes: 52 additions & 58 deletions src/block/put.js
Original file line number Diff line number Diff line change
@@ -1,74 +1,68 @@
'use strict'

const promisify = require('promisify-es6')
const Block = require('ipfs-block')
const CID = require('cids')
const multihash = require('multihashes')
const SendOneFile = require('../utils/send-one-file')

module.exports = (send) => {
const sendOneFile = SendOneFile(send, 'block/put')

return promisify((block, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
const configure = require('../lib/configure')
const toFormData = require('../lib/buffer-to-form-data')

module.exports = configure(({ ky }) => {
async function put (data, options) {
options = options || {}

if (Array.isArray(block)) {
return callback(new Error('block.put accepts only one block'))
}

if (Buffer.isBuffer(block)) {
block = { data: block }
}

if (!block || !block.data) {
return callback(new Error('invalid block arg'))
if (Block.isBlock(data)) {
const { name, length } = multihash.decode(data.cid.multihash)
options = {
...options,
format: data.cid.codec,
mhtype: name,
mhlen: length,
version: data.cid.version
}
data = data.data
} else if (options.cid) {
const cid = new CID(options.cid)
const { name, length } = multihash.decode(cid.multihash)
options = {
...options,
format: cid.codec,
mhtype: name,
mhlen: length,
version: cid.version
}
delete options.cid
}

const qs = {}
const searchParams = new URLSearchParams(options.searchParams)
if (options.format) searchParams.set('format', options.format)
if (options.mhtype) searchParams.set('mhtype', options.mhtype)
if (options.mhlen) searchParams.set('mhlen', options.mhlen)
if (options.pin != null) searchParams.set('pin', options.pin)
if (options.version != null) searchParams.set('version', options.version)

if (block.cid || options.cid) {
let cid

try {
cid = new CID(block.cid || options.cid)
} catch (err) {
return callback(err)
let res
try {
res = await ky.post('block/put', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams,
body: toFormData(data)
}).json()
} catch (err) {
// Retry with "protobuf"/"cbor" format for go-ipfs
// TODO: remove when https://github.com/ipfs/go-cid/issues/75 resolved
if (options.format === 'dag-pb') {
return put(data, { ...options, format: 'protobuf' })
} else if (options.format === 'dag-cbor') {
return put(data, { ...options, format: 'cbor' })
}

const { name, length } = multihash.decode(cid.multihash)

qs.format = cid.codec
qs.mhtype = name
qs.mhlen = length
qs.version = cid.version
} else {
if (options.format) qs.format = options.format
if (options.mhtype) qs.mhtype = options.mhtype
if (options.mhlen) qs.mhlen = options.mhlen
if (options.version != null) qs.version = options.version
throw err
}

sendOneFile(block.data, { qs }, (err, result) => {
if (err) {
// Retry with "protobuf"/"cbor" format for go-ipfs
// TODO: remove when https://github.com/ipfs/go-cid/issues/75 resolved
if (qs.format === 'dag-pb' || qs.format === 'dag-cbor') {
qs.format = qs.format === 'dag-pb' ? 'protobuf' : 'cbor'
return sendOneFile(block.data, { qs }, (err, result) => {
if (err) return callback(err)
callback(null, new Block(block.data, new CID(result.Key)))
})
}

return callback(err)
}
return new Block(data, new CID(res.Key))
}

callback(null, new Block(block.data, new CID(result.Key)))
})
})
}
return put
})
45 changes: 19 additions & 26 deletions src/block/stat.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
'use strict'

const promisify = require('promisify-es6')
const CID = require('cids')
const multihash = require('multihashes')
const { Buffer } = require('buffer')
const configure = require('../lib/configure')
const toCamel = require('../lib/object-to-camel')

module.exports = (send) => {
return promisify((args, opts, callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
if (args && CID.isCID(args)) {
args = multihash.toB58String(args.multihash)
}
module.exports = configure(({ ky }) => {
return async (cid, options) => {
options = options || {}

if (typeof (opts) === 'function') {
callback = opts
opts = {}
if (Buffer.isBuffer(cid)) {
cid = new CID(cid)
}

const request = {
path: 'block/stat',
args: args,
qs: opts
}
const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${cid}`)

// Transform the response from { Key, Size } objects to { key, size } objects
const transform = (stats, callback) => {
callback(null, {
key: stats.Key,
size: stats.Size
})
}
const res = await ky.get('block/stat', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
}).json()

send.andTransform(request, transform, callback)
})
}
return toCamel(res)
}
})
74 changes: 23 additions & 51 deletions src/dag/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,39 @@
const dagPB = require('ipld-dag-pb')
const dagCBOR = require('ipld-dag-cbor')
const raw = require('ipld-raw')
const promisify = require('promisify-es6')
const CID = require('cids')
const waterfall = require('async/waterfall')
const block = require('../block')
const configure = require('../lib/configure')

const resolvers = {
'dag-cbor': dagCBOR.resolver,
'dag-pb': dagPB.resolver,
raw: raw.resolver
}

module.exports = (send) => {
return promisify((cid, path, options, callback) => {
if (typeof path === 'function') {
callback = path
path = undefined
}

if (typeof options === 'function') {
callback = options
options = {}
}
module.exports = config => {
const getBlock = require('../block/get')(config)
const dagResolve = require('./resolve')(config)

options = options || {}
path = path || ''

if (CID.isCID(cid)) {
cid = cid.toBaseEncodedString()
}

waterfall([
cb => {
send({
path: 'dag/resolve',
args: cid + '/' + path,
qs: options
}, cb)
},
(resolved, cb) => {
block(send).get(new CID(resolved.Cid['/']), (err, ipfsBlock) => {
cb(err, ipfsBlock, resolved.RemPath)
})
},
(ipfsBlock, path, cb) => {
const dagResolver = resolvers[ipfsBlock.cid.codec]
return configure(({ ky }) => {
return async (cid, path, options) => {
if (typeof path === 'object') {
options = path
path = null
}

if (!dagResolver) {
const error = new Error(`Missing IPLD format "${ipfsBlock.cid.codec}"`)
error.missingMulticodec = ipfsBlock.cid.codec
return cb(error)
}
options = options || {}

let res
try {
res = dagResolver.resolve(ipfsBlock.data, path)
} catch (err) {
return cb(err)
}
const resolved = await dagResolve(cid, path, options)
const block = await getBlock(resolved.cid, options)
const dagResolver = resolvers[block.cid.codec]

cb(null, res)
if (!dagResolver) {
throw Object.assign(
new Error(`Missing IPLD format "${block.cid.codec}"`),
{ missingMulticodec: cid.codec }
)
}
], callback)
})

return dagResolver.resolve(block.data, resolved.remPath)
}
})(config)
}
Loading