Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use async/await #192

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@
"moving-average": "^1.0.0",
"multicodec": "~0.5.0",
"multihashing-async": "~0.5.1",
"promisify-es6": "^1.0.3",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.1",
"pull-stream": "^3.6.9",
"typical": "^4.0.0",
"varint-decoder": "~0.1.1"
},
"pre-push": [
Expand Down
246 changes: 101 additions & 145 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
'use strict'

const waterfall = require('async/waterfall')
const reject = require('async/reject')
const each = require('async/each')
const series = require('async/series')
const map = require('async/map')
const nextTick = require('async/nextTick')
const promisify = require('promisify-es6')
const typical = require('typical')

const WantManager = require('./want-manager')
const Network = require('./network')
const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const { logger, extendIterator } = require('./utils')
const Stats = require('./stats')

const defaultOptions = {
Expand Down Expand Up @@ -112,7 +111,7 @@ class Bitswap {
return nextTick(cb)
}

this._putBlock(block, cb)
this.put(block).then(() => cb())
}
], callback)
}
Expand Down Expand Up @@ -144,21 +143,9 @@ class Bitswap {
this._stats.disconnected(peerId)
}

_putBlock (block, callback) {
this.blockstore.put(block, (err) => {
if (err) {
return callback(err)
}

this.notifications.hasBlock(block)
this.network.provide(block.cid, (err) => {
if (err) {
this._log.error('Failed to provide: %s', err.message)
}
})

this.engine.receivedBlocks([block.cid])
callback()
_findAndConnect (cid) {
this.network.findAndConnect(cid).catch((err) => {
if (err) this._log.error(err)
})
}

Expand Down Expand Up @@ -194,88 +181,72 @@ class Bitswap {
* Fetch a given block by cid. If the block is in the local
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
*
* @param {CID} cid
* @param {function(Error, Block)} callback
* @returns {void}
* @param {CID} cid - The CID of the block that should be retrieved.
* @param {Object} options
* @param {boolean} options.promptNetwork - Option whether to promptNetwork or not
* @returns {Promise.<Object>} - Returns a promise with a block corresponding with the given `cid`.
*/
get (cid, callback) {
this.getMany([cid], (err, blocks) => {
if (err) {
return callback(err)
}
async get (cid, options) {
const optionsCopy = Object.assign({}, options)

optionsCopy.promptNetwork = optionsCopy.promptNetwork || true

const getFromOutside = (cid) => {
return new Promise((resolve) => {
this.wm.wantBlocks([cid])

this.notifications.wantBlock(
cid,
// called on block receive
(block) => {
this.wm.cancelWants([cid])
resolve(block)
},
// called on unwant
() => {
this.wm.cancelWants([cid])
resolve(undefined)
}
)
})
}

if (blocks && blocks.length > 0) {
callback(null, blocks[0])
} else {
// when a unwant happens
callback()
if (await promisify(this.blockstore.has)(cid)) {
return promisify(this.blockstore.get)(cid)
} else {
if (optionsCopy.promptNetwork) {
this._findAndConnect(cid)
}
})
return getFromOutside(cid)
}
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Array<CID>} cids
* @param {function(Error, Blocks)} callback
* @returns {void}
* @param {Iterable.<CID>} cids
* @returns {Iterable.<Promise.<Object>>}
*/
getMany (cids, callback) {
let pendingStart = cids.length
const wantList = []
let promptedNetwork = false

const getFromOutside = (cid, cb) => {
wantList.push(cid)

this.notifications.wantBlock(
cid,
// called on block receive
(block) => {
this.wm.cancelWants([cid])
cb(null, block)
},
// called on unwant
() => {
this.wm.cancelWants([cid])
cb(null, undefined)
}
)
getMany (cids) {
if (!typical.isIterable(cids) || typical.isString(cids) ||
Buffer.isBuffer(cids)) {
throw new Error('`cids` must be an iterable of CIDs')
}

if (!pendingStart) {
this.wm.wantBlocks(wantList)
const generator = async function * () {
let promptNetwork = true
for await (const cid of cids) {
if (promptNetwork) {
yield this.get(cid, { promptNetwork: true })
promptNetwork = false
} else {
yield this.get(cid, { promptNetwork: false })
}
}
}
}.bind(this)

map(cids, (cid, cb) => {
waterfall(
[
(cb) => this.blockstore.has(cid, cb),
(has, cb) => {
pendingStart--
if (has) {
if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
return this.blockstore.get(cid, cb)
}

if (!promptedNetwork) {
promptedNetwork = true
this.network.findAndConnect(cids[0], (err) => {
if (err) {
this._log.error(err)
}
})
}

// we don't have the block here
getFromOutside(cid, cb)
}
],
cb)
}, callback)
return extendIterator(generator())
}

// removes the given cids from the wantlist independent of any ref counts
Expand All @@ -300,61 +271,53 @@ class Bitswap {
* Put the given block to the underlying blockstore and
* send it to nodes that have it in their wantlist.
*
* @param {Block} block
* @param {function(Error)} callback
* @returns {void}
* @param {Block} block - Block that should be inserted.
* @returns {Promise.<CID>} - Returns the CID of the serialized IPLD Nodes.
*/
put (block, callback) {
async put (block) {
this._log('putting block')

waterfall([
(cb) => this.blockstore.has(block.cid, cb),
(has, cb) => {
if (has) {
return nextTick(cb)
}
const has = await promisify(this.blockstore.has)(block.cid)
if (!has) {
await promisify(this.blockstore.put)(block)
this.notifications.hasBlock(block)
this.network.provide(block.cid, (err) => {
if (err) this._log.error('Failed to provide: %s', err.message)
})
this.engine.receivedBlocks([block.cid])
}

this._putBlock(block, cb)
}
], callback)
return block.cid
}

/**
* Put the given blocks to the underlying blockstore and
* send it to nodes that have it them their wantlist.
*
* @param {Array<Block>} blocks
* @param {function(Error)} callback
* @returns {void}
* @param {Iterable.<Block>} blocks
* @returns {Iterable.<Promise.<CID>>} - Returns an async iterator with the CIDs of the blocks inserted
*/
putMany (blocks, callback) {
waterfall([
(cb) => reject(blocks, (b, cb) => {
this.blockstore.has(b.cid, cb)
}, cb),
(newBlocks, cb) => this.blockstore.putMany(newBlocks, (err) => {
if (err) {
return cb(err)
putMany (blocks) {
if (!typical.isIterable(blocks)) {
throw new Error('`blocks` must be an iterable')
}

const generator = async function * () {
for await (const block of blocks) {
const has = await promisify(this.blockstore.has)(block.cid)
if (!has) {
yield this.put(block)
}
}
}.bind(this)

newBlocks.forEach((block) => {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block.cid])
this.network.provide(block.cid, (err) => {
if (err) {
this._log.error('Failed to provide: %s', err.message)
}
})
})
cb()
})
], callback)
return extendIterator(generator())
}

/**
* Get the current list of wants.
*
* @returns {Iterator<WantlistEntry>}
* @returns {Iterator.<WantlistEntry>}
*/
getWantlist () {
return this.wm.wantlist.entries()
Expand All @@ -363,7 +326,7 @@ class Bitswap {
/**
* Get the current list of partners.
*
* @returns {Array<PeerId>}
* @returns {Array.<PeerId>}
*/
peers () {
return this.engine.peers()
Expand All @@ -381,32 +344,25 @@ class Bitswap {
/**
* Start the bitswap node.
*
* @param {function(Error)} callback
*
* @returns {void}
* @returns {Promise}
*/
start (callback) {
series([
(cb) => this.wm.start(cb),
(cb) => this.network.start(cb),
(cb) => this.engine.start(cb)
], callback)
async start () {
await promisify(this.wm.start.bind(this.wm))()
await promisify(this.network.start.bind(this.network))()
await promisify(this.engine.start.bind(this.engine))()
}

/**
* Stop the bitswap node.
*
* @param {function(Error)} callback
*
* @returns {void}
* @returns {Promise}
*/
stop (callback) {
async stop () {
this._stats.stop()
series([
(cb) => this.wm.stop(cb),
(cb) => this.network.stop(cb),
(cb) => this.engine.stop(cb)
], callback)

await promisify(this.wm.stop.bind(this.wm))()
await promisify(this.network.stop.bind(this.network))()
await promisify(this.engine.stop.bind(this.engine))()
}
}

Expand Down
Loading