From ea6f6b706360124aa82c3e84ac6f10402aa5abf0 Mon Sep 17 00:00:00 2001 From: nijynot Date: Sun, 14 Apr 2019 20:06:27 +0200 Subject: [PATCH 1/5] feat: make `get()` a generator * make `getMany()` AsyncIterable --- package.json | 2 + src/index.js | 135 ++++++++-------- src/utils.js | 32 +++- test/bitswap-mock-internals.js | 193 +++++++++++------------ test/bitswap-stats.js | 5 +- test/bitswap.js | 35 ++-- test/network/gen-bitswap-network.node.js | 33 ++-- 7 files changed, 220 insertions(+), 215 deletions(-) diff --git a/package.json b/package.json index ab7ceb17..9ce9a6e0 100644 --- a/package.json +++ b/package.json @@ -76,9 +76,11 @@ "moving-average": "^1.0.0", "multicodec": "~0.5.0", "multihashing-async": "~0.5.1", + "promisify-es6": "^1.0.3", "protons": "^1.0.1", "pull-length-prefixed": "^1.3.1", "pull-stream": "^3.6.9", + "typical": "^4.0.0", "varint-decoder": "~0.1.1" }, "pre-push": [ diff --git a/src/index.js b/src/index.js index fddfac23..6619ad7d 100644 --- a/src/index.js +++ b/src/index.js @@ -4,14 +4,15 @@ const waterfall = require('async/waterfall') const reject = require('async/reject') const each = require('async/each') const series = require('async/series') -const map = require('async/map') const nextTick = require('async/nextTick') +const promisify = require('promisify-es6') +const typical = require('typical') const WantManager = require('./want-manager') const Network = require('./network') const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') -const logger = require('./utils').logger +const { logger, extendIterator } = require('./utils') const Stats = require('./stats') const defaultOptions = { @@ -162,6 +163,14 @@ class Bitswap { }) } + _findAndConnect (cid) { + if (this.promptNetwork) { + this.network.findAndConnect(cid, (err) => { + if (err) this._log.error(err) + }) + } + } + enableStats () { this._stats.enable() } @@ -194,88 +203,72 @@ class Bitswap { * Fetch a given block by cid. If the block is in the local * blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us. * - * @param {CID} cid - * @param {function(Error, Block)} callback - * @returns {void} + * @param {CID} cid - The CID of the block that should be retrieved. + * @param {Object} options + * @param {boolean} options.promptNetwork - Option whether to promptNetwork or not + * @returns {Promise.} - Returns a promise with a block corresponding with the given `cid`. */ - get (cid, callback) { - this.getMany([cid], (err, blocks) => { - if (err) { - return callback(err) - } + async get (cid, options) { + const optionsCopy = Object.assign({}, options) + + optionsCopy.promptNetwork = optionsCopy.promptNetwork || true + + const getFromOutside = (cid) => { + return new Promise((resolve) => { + this.wm.wantBlocks([cid]) + + this.notifications.wantBlock( + cid, + // called on block receive + (block) => { + this.wm.cancelWants([cid]) + resolve(block) + }, + // called on unwant + () => { + this.wm.cancelWants([cid]) + resolve(undefined) + } + ) + }) + } - if (blocks && blocks.length > 0) { - callback(null, blocks[0]) - } else { - // when a unwant happens - callback() + if (await promisify(this.blockstore.has)(cid)) { + return promisify(this.blockstore.get)(cid) + } else { + if (optionsCopy.promptNetwork) { + this._findAndConnect(cid) } - }) + return getFromOutside(cid) + } } /** * Fetch a a list of blocks by cid. If the blocks are in the local * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. * - * @param {Array} cids - * @param {function(Error, Blocks)} callback - * @returns {void} + * @param {Iterable.} cids + * @returns {Iterable.>} */ - getMany (cids, callback) { - let pendingStart = cids.length - const wantList = [] - let promptedNetwork = false - - const getFromOutside = (cid, cb) => { - wantList.push(cid) - - this.notifications.wantBlock( - cid, - // called on block receive - (block) => { - this.wm.cancelWants([cid]) - cb(null, block) - }, - // called on unwant - () => { - this.wm.cancelWants([cid]) - cb(null, undefined) - } - ) - - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } + getMany (cids) { + if (!typical.isIterable(cids) || typical.isString(cids) || + Buffer.isBuffer(cids)) { + throw new Error('`cids` must be an iterable of CIDs') } - map(cids, (cid, cb) => { - waterfall( - [ - (cb) => this.blockstore.has(cid, cb), - (has, cb) => { - pendingStart-- - if (has) { - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - return this.blockstore.get(cid, cb) - } - - if (!promptedNetwork) { - promptedNetwork = true - this.network.findAndConnect(cids[0], (err) => { - if (err) { - this._log.error(err) - } - }) - } + const generator = async function * () { + let promptNetwork = true + for await (const cid of cids) { + if (promptNetwork) { + yield this.get(cid, { promptNetwork: true }) + promptNetwork = false + } else { + yield this.get(cid, { promptNetwork: false }) + } + } + }.bind(this) - // we don't have the block here - getFromOutside(cid, cb) - } - ], - cb) - }, callback) + return extendIterator(generator()) } // removes the given cids from the wantlist independent of any ref counts diff --git a/src/utils.js b/src/utils.js index 25f6bf32..34abe623 100644 --- a/src/utils.js +++ b/src/utils.js @@ -80,11 +80,41 @@ const sortBy = (fn, list) => { }) } +const first = async (iterator) => { + for await (const value of iterator) { + return value + } +} + +const last = async (iterator) => { + let value + for await (value of iterator) { + // Intentionally empty + } + return value +} + +const all = async (iterator) => { + const values = [] + for await (const value of iterator) { + values.push(value) + } + return values +} + +const extendIterator = (iterator) => { + iterator.first = () => first(iterator) + iterator.last = () => last(iterator) + iterator.all = () => all(iterator) + return iterator +} + module.exports = { logger, includesWith, uniqWith, groupBy, pullAllWith, - sortBy + sortBy, + extendIterator } diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 24c39d92..89c656ea 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -3,7 +3,6 @@ 'use strict' const eachSeries = require('async/eachSeries') -const waterfall = require('async/waterfall') const map = require('async/map') const parallel = require('async/parallel') const setImmediate = require('async/setImmediate') @@ -12,6 +11,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const PeerId = require('peer-id') +const promisify = require('promisify-es6') const Message = require('../src/types/message') const Bitswap = require('../src') @@ -160,79 +160,71 @@ describe('bitswap with mocks', function () { }) describe('get', () => { - it('fails on requesting empty block', (done) => { + it('fails on requesting empty block', async () => { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.get(null, (err, res) => { + try { + await bs.get(null) + } catch (err) { expect(err).to.exist() expect(err.message).to.equal('Not a valid cid') - done() - }) + } }) - it('block exists locally', (done) => { + it('block exists locally', async () => { const block = blocks[4] + await promisify(repo.blocks.put)(block) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - repo.blocks.put(block, (err) => { - expect(err).to.not.exist() - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - - bs.get(block.cid, (err, res) => { - expect(err).to.not.exist() - expect(res).to.eql(block) - done() - }) - }) + const retrievedBlock = await bs.get(block.cid) + expect(retrievedBlock).to.eql(block) }) - it('blocks exist locally', (done) => { + it('blocks exist locally', async () => { const b1 = blocks[3] const b2 = blocks[14] const b3 = blocks[13] - repo.blocks.putMany([b1, b2, b3], (err) => { - expect(err).to.not.exist() - - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await promisify(repo.blocks.putMany)([b1, b2, b3]) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.getMany([b1.cid, b2.cid, b3.cid], (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([b1, b2, b3]) - done() - }) - }) + const gen = bs.getMany([b1.cid, b2.cid, b3.cid]) + const retrievedBlocks = [] + for await (const block of gen) { + retrievedBlocks.push(block) + } + expect(retrievedBlocks).to.be.eql([b1, b2, b3]) }) - it('getMany', (done) => { + it('getMany', async () => { const b1 = blocks[5] const b2 = blocks[6] const b3 = blocks[7] - repo.blocks.putMany([b1, b2, b3], (err) => { - expect(err).to.not.exist() + await promisify(repo.blocks.putMany)([b1, b2, b3]) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const block1 = await bs.get(b1.cid) + expect(block1).to.eql(b1) - map([b1.cid, b2.cid, b3.cid], (cid, cb) => bs.get(cid, cb), (err, res) => { - expect(err).to.not.exist() - expect(res).to.eql([b1, b2, b3]) - done() - }) - }) + const block2 = await bs.get(b2.cid) + expect(block2).to.eql(b2) + + const block3 = await bs.get(b3.cid) + expect(block3).to.eql(b3) }) it('block is added locally afterwards', (done) => { - const finish = orderedFinish(2, done) - const block = blocks[9] - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - const net = mockNetwork() + (async () => { + const finish = orderedFinish(2, done) + const block = blocks[9] + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const net = mockNetwork() - bs.network = net - bs.wm.network = net - bs.engine.network = net - bs.start((err) => { - expect(err).to.not.exist() - bs.get(block.cid, (err, res) => { - expect(err).to.not.exist() + bs.network = net + bs.wm.network = net + bs.engine.network = net + await promisify(bs.start.bind(bs))() + bs.get(block.cid).then((res) => { expect(res).to.eql(block) finish(2) }) @@ -241,7 +233,7 @@ describe('bitswap with mocks', function () { finish(1) bs.put(block, () => {}) }, 200) - }) + })() }) it('block is sent after local add', (done) => { @@ -307,39 +299,33 @@ describe('bitswap with mocks', function () { setImmediate(() => callback()) } } - bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) - applyNetwork(bs1, n1) - - bs1.start((err) => { - expect(err).to.not.exist() - let repo2 + // Do not remove semi-colon. Will break the test. + ;(async () => { + // Create and start bs1 + bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) + applyNetwork(bs1, n1) + await promisify(bs1.start.bind(bs1))() - waterfall([ - (cb) => createTempRepo(cb), - (repo, cb) => { - repo2 = repo - bs2 = new Bitswap(mockLibp2pNode(), repo2.blocks) - applyNetwork(bs2, n2) - bs2.start((err) => { - expect(err).to.not.exist() + // Create and start bs2 + const repo2 = await promisify(createTempRepo)() + bs2 = new Bitswap(mockLibp2pNode(), repo2.blocks) + applyNetwork(bs2, n2) + await promisify(bs2.start.bind(bs2))() - bs1._onPeerConnected(other) - bs2._onPeerConnected(me) + bs1._onPeerConnected(other) + bs2._onPeerConnected(me) - bs1.get(block.cid, (err, res) => { - expect(err).to.not.exist() - cb(null, res) - }) - setTimeout(() => bs2.put(block, () => {}), 1000) - }) - }, - (res, cb) => { - expect(res).to.eql(block) - cb() - } - ], done) - }) + bs1.get(block.cid).then((res) => { + expect(res).to.eql(block) + done() + }).catch((err) => { + expect(err).to.not.exist() + }) + setTimeout(() => { + bs2.put(block, () => {}) + }, 1000) + })() }) it('double get', (done) => { @@ -347,18 +333,16 @@ describe('bitswap with mocks', function () { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - parallel( - [ - (cb) => bs.get(block.cid, cb), - (cb) => bs.get(block.cid, cb) - ], - (err, res) => { - expect(err).to.not.exist() - expect(res[0]).to.eql(block) - expect(res[1]).to.eql(block) - done() - } - ) + Promise.all([ + bs.get(block.cid), + bs.get(block.cid) + ]).then((res) => { + expect(res[0]).to.eql(block) + expect(res[1]).to.eql(block) + done() + }).catch((err) => { + expect(err).to.not.exist() + }) bs.put(block, (err) => { expect(err).to.not.exist() @@ -368,24 +352,23 @@ describe('bitswap with mocks', function () { describe('unwant', () => { it('removes blocks that are wanted multiple times', (done) => { - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.start((err) => { - expect(err).to.not.exist() - const b = blocks[12] - - let counter = 0 - const check = (err, res) => { - expect(err).to.not.exist() - expect(res).to.not.exist() - - if (++counter === 2) { done() } - } + (async () => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await promisify(bs.start.bind(bs))() - bs.get(b.cid, check) - bs.get(b.cid, check) + const b = blocks[12] + Promise.all([ + bs.get(b.cid), + bs.get(b.cid) + ]).then((res) => { + expect(res[1]).to.not.exist() + done() + }).catch((e) => { + expect(e).to.not.exist() + }) setTimeout(() => bs.unwant(b.cid), 10) - }) + })() }) }) diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index 73c27a48..211a708f 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -231,10 +231,11 @@ describe('bitswap stats', () => { finish() }) - bs2.get(block.cid, (err, block) => { - expect(err).to.not.exist() + bs2.get(block.cid).then(() => { expect(block).to.exist() finish() + }).catch((err) => { + expect(err).to.not.exist() }) }) diff --git a/test/bitswap.js b/test/bitswap.js index ec1b52f2..f3704879 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -6,6 +6,7 @@ const waterfall = require('async/waterfall') const series = require('async/series') const each = require('async/each') const parallel = require('async/parallel') +const promisify = require('promisify-es6') const chai = require('chai') chai.use(require('dirty-chai')) @@ -78,15 +79,13 @@ describe('bitswap without DHT', function () { }) it('put a block in 2, fail to get it in 0', (done) => { - const finish = orderedFinish(2, done) + (async () => { + const finish = orderedFinish(2, done) - waterfall([ - (cb) => makeBlock(cb), - (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)) - ], (err, block) => { - expect(err).to.not.exist() - nodes[0].bitswap.get(block.cid, (err, block) => { - expect(err).to.not.exist() + const block = await promisify(makeBlock)() + await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) + + nodes[0].bitswap.get(block.cid).then((block) => { expect(block).to.not.exist() finish(2) }) @@ -95,7 +94,7 @@ describe('bitswap without DHT', function () { finish(1) nodes[0].bitswap.unwant(block.cid) }, 200) - }) + })() }) }) @@ -134,16 +133,12 @@ describe('bitswap with DHT', function () { ], done) }) - it('put a block in 2, get it in 0', function (done) { - waterfall([ - (cb) => makeBlock(cb), - (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)), - (block, cb) => nodes[0].bitswap.get(block.cid, (err, blockRetrieved) => { - expect(err).to.not.exist() - expect(block.data).to.eql(blockRetrieved.data) - expect(block.cid).to.eql(blockRetrieved.cid) - cb() - }) - ], done) + it('put a block in 2, get it in 0', async () => { + const block = await promisify(makeBlock)() + await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) + + const blockRetrieved = await nodes[0].bitswap.get(block.cid) + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) }) }) diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 2cde6492..72398ab6 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -49,10 +49,9 @@ describe('gen Bitswap network', function () { (b, cb) => node.bitswap.put(b, cb), cb ), - (cb) => map(_.range(100), (i, cb) => { - node.bitswap.get(blocks[i].cid, cb) - }, (err, res) => { - expect(err).to.not.exist() + (cb) => Promise.all(_.range(100).map((i) => { + return node.bitswap.get(blocks[i].cid) + })).then((res) => { expect(res).to.have.length(blocks.length) cb() }) @@ -119,19 +118,21 @@ function round (nodeArr, n, cb) { }), cb), (cb) => { d = (new Date()).getTime() - // fetch all blocks on every node - parallel(_.map(nodeArr, (node, i) => (cb) => { - map(cids, (cid, cb) => { - node.bitswap.get(cid, cb) - }, (err, res) => { - if (err) { - return cb(err) - } - expect(res).to.have.length(blocks.length) - cb() - }) - }), cb) + // fetch all blocks on every node + Promise.all(nodeArr.map((node) => { + return Promise.all( + cids.map((cid) => { + return node.bitswap.get(cid) + }) + ) + })).then((res) => { + expect(res[0]).to.deep.equal(blocks) + expect(res[1]).to.deep.equal(blocks) + cb() + }).catch((err) => { + cb(err) + }) } ], (err) => { if (err) { From 71562ab0d03fba8cb0b8ab0c4242f24a51749205 Mon Sep 17 00:00:00 2001 From: nijynot Date: Sun, 14 Apr 2019 21:03:27 +0200 Subject: [PATCH 2/5] feat: make `put()` a generator * make `putMany()` AsyncIterable * remove check in `_findAndConnect()` --- src/index.js | 92 +++++++++--------------- test/bitswap-stats.js | 4 +- test/bitswap.js | 6 +- test/network/gen-bitswap-network.node.js | 4 +- 4 files changed, 40 insertions(+), 66 deletions(-) diff --git a/src/index.js b/src/index.js index 6619ad7d..abfebd6f 100644 --- a/src/index.js +++ b/src/index.js @@ -113,7 +113,7 @@ class Bitswap { return nextTick(cb) } - this._putBlock(block, cb) + this.put(block).then(() => cb()) } ], callback) } @@ -145,30 +145,10 @@ class Bitswap { this._stats.disconnected(peerId) } - _putBlock (block, callback) { - this.blockstore.put(block, (err) => { - if (err) { - return callback(err) - } - - this.notifications.hasBlock(block) - this.network.provide(block.cid, (err) => { - if (err) { - this._log.error('Failed to provide: %s', err.message) - } - }) - - this.engine.receivedBlocks([block.cid]) - callback() - }) - } - _findAndConnect (cid) { - if (this.promptNetwork) { - this.network.findAndConnect(cid, (err) => { - if (err) this._log.error(err) - }) - } + this.network.findAndConnect(cid, (err) => { + if (err) this._log.error(err) + }) } enableStats () { @@ -293,55 +273,47 @@ class Bitswap { * Put the given block to the underlying blockstore and * send it to nodes that have it in their wantlist. * - * @param {Block} block - * @param {function(Error)} callback - * @returns {void} + * @param {Block} block - Block that should be inserted. + * @returns {Promise.} - Returns the CID of the serialized IPLD Nodes. */ - put (block, callback) { + async put (block) { this._log('putting block') - waterfall([ - (cb) => this.blockstore.has(block.cid, cb), - (has, cb) => { - if (has) { - return nextTick(cb) - } + const has = await promisify(this.blockstore.has)(block.cid) + if (!has) { + await promisify(this.blockstore.put)(block) + this.notifications.hasBlock(block) + this.network.provide(block.cid, (err) => { + if (err) this._log.error('Failed to provide: %s', err.message) + }) + this.engine.receivedBlocks([block.cid]) + } - this._putBlock(block, cb) - } - ], callback) + return block.cid } /** * Put the given blocks to the underlying blockstore and * send it to nodes that have it them their wantlist. * - * @param {Array} blocks - * @param {function(Error)} callback - * @returns {void} + * @param {Iterable.} blocks + * @returns {Iterable.>} - Returns an async iterator with the CIDs of the blocks inserted */ - putMany (blocks, callback) { - waterfall([ - (cb) => reject(blocks, (b, cb) => { - this.blockstore.has(b.cid, cb) - }, cb), - (newBlocks, cb) => this.blockstore.putMany(newBlocks, (err) => { - if (err) { - return cb(err) + putMany (blocks) { + if (!typical.isIterable(blocks)) { + throw new Error('`blocks` must be an iterable') + } + + const generator = async function * () { + for await (const block of blocks) { + const has = await promisify(this.blockstore.has)(block.cid) + if (!has) { + yield this.put(block) } + } + }.bind(this) - newBlocks.forEach((block) => { - this.notifications.hasBlock(block) - this.engine.receivedBlocks([block.cid]) - this.network.provide(block.cid, (err) => { - if (err) { - this._log.error('Failed to provide: %s', err.message) - } - }) - }) - cb() - }) - ], callback) + return extendIterator(generator()) } /** diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index 211a708f..f8fb339f 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -212,8 +212,8 @@ describe('bitswap stats', () => { }) }) - before((done) => { - bs.put(block, done) + before(async () => { + await bs.put(block) }) it('updates stats on transfer', (done) => { diff --git a/test/bitswap.js b/test/bitswap.js index f3704879..f038dfc1 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -83,7 +83,8 @@ describe('bitswap without DHT', function () { const finish = orderedFinish(2, done) const block = await promisify(makeBlock)() - await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) + await nodes[2].bitswap.put(block) + // await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) nodes[0].bitswap.get(block.cid).then((block) => { expect(block).to.not.exist() @@ -135,7 +136,8 @@ describe('bitswap with DHT', function () { it('put a block in 2, get it in 0', async () => { const block = await promisify(makeBlock)() - await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) + nodes[2].bitswap.put(block) + // await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) const blockRetrieved = await nodes[0].bitswap.get(block.cid) expect(block.data).to.eql(blockRetrieved.data) diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 72398ab6..6cc8685b 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -46,7 +46,7 @@ describe('gen Bitswap network', function () { }), (cb) => each( blocks, - (b, cb) => node.bitswap.put(b, cb), + (b, cb) => node.bitswap.put(b).then(() => cb()), cb ), (cb) => Promise.all(_.range(100).map((i) => { @@ -114,7 +114,7 @@ function round (nodeArr, n, cb) { return blocks[index] }) - each(data, (d, cb) => node.bitswap.put(d, cb), cb) + each(data, (d, cb) => node.bitswap.put(d).then(() => cb()), cb) }), cb), (cb) => { d = (new Date()).getTime() From 660d8c5487e1f55d67b934ba30851dcb5bd17749 Mon Sep 17 00:00:00 2001 From: nijynot Date: Sun, 14 Apr 2019 21:37:52 +0200 Subject: [PATCH 3/5] feat: make `start()` and `stop()` async/await --- src/index.js | 35 +++++++++++++--------------------- test/bitswap-mock-internals.js | 35 +++++++++++++++++++--------------- test/bitswap-stats.js | 10 ++++++---- test/bitswap.js | 6 +++--- 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/src/index.js b/src/index.js index abfebd6f..0fbab810 100644 --- a/src/index.js +++ b/src/index.js @@ -1,9 +1,7 @@ 'use strict' const waterfall = require('async/waterfall') -const reject = require('async/reject') const each = require('async/each') -const series = require('async/series') const nextTick = require('async/nextTick') const promisify = require('promisify-es6') const typical = require('typical') @@ -319,7 +317,7 @@ class Bitswap { /** * Get the current list of wants. * - * @returns {Iterator} + * @returns {Iterator.} */ getWantlist () { return this.wm.wantlist.entries() @@ -328,7 +326,7 @@ class Bitswap { /** * Get the current list of partners. * - * @returns {Array} + * @returns {Array.} */ peers () { return this.engine.peers() @@ -346,32 +344,25 @@ class Bitswap { /** * Start the bitswap node. * - * @param {function(Error)} callback - * - * @returns {void} + * @returns {Promise} */ - start (callback) { - series([ - (cb) => this.wm.start(cb), - (cb) => this.network.start(cb), - (cb) => this.engine.start(cb) - ], callback) + async start () { + await promisify(this.wm.start.bind(this.wm))() + await promisify(this.network.start.bind(this.network))() + await promisify(this.engine.start.bind(this.engine))() } /** * Stop the bitswap node. * - * @param {function(Error)} callback - * - * @returns {void} + * @returns {Promise} */ - stop (callback) { + async stop () { this._stats.stop() - series([ - (cb) => this.wm.stop(cb), - (cb) => this.network.stop(cb), - (cb) => this.engine.stop(cb) - ], callback) + + await promisify(this.wm.stop.bind(this.wm))() + await promisify(this.network.stop.bind(this.network))() + await promisify(this.engine.stop.bind(this.engine))() } } diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 89c656ea..5f16f9d0 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -55,9 +55,11 @@ describe('bitswap with mocks', function () { describe('receive message', () => { it('simple block message', (done) => { - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.start((err) => { - expect(err).to.not.exist() + (async () => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await bs.start().catch((err) => { + expect(err).to.not.exist() + }) const other = ids[1] @@ -86,13 +88,16 @@ describe('bitswap with mocks', function () { done() }) }) - }) + })() }) it('simple want message', (done) => { - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.start((err) => { - expect(err).to.not.exist() + (async () => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await bs.start().catch((err) => { + expect(err).to.not.exist() + }) + const other = ids[1] const b1 = blocks[0] const b2 = blocks[1] @@ -112,7 +117,7 @@ describe('bitswap with mocks', function () { done() }) - }) + })() }) it('multi peer', function (done) { @@ -122,8 +127,8 @@ describe('bitswap with mocks', function () { let others let blocks - bs.start((err) => { - expect(err).to.not.exist() + (async () => { + await bs.start() parallel([ (cb) => map(_.range(5), (i, cb) => PeerId.create({ bits: 512 }, cb), cb), @@ -155,7 +160,7 @@ describe('bitswap with mocks', function () { }, done) }) } - }) + })() }) }) @@ -223,7 +228,7 @@ describe('bitswap with mocks', function () { bs.network = net bs.wm.network = net bs.engine.network = net - await promisify(bs.start.bind(bs))() + await bs.start() bs.get(block.cid).then((res) => { expect(res).to.eql(block) finish(2) @@ -305,13 +310,13 @@ describe('bitswap with mocks', function () { // Create and start bs1 bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) applyNetwork(bs1, n1) - await promisify(bs1.start.bind(bs1))() + await bs1.start() // Create and start bs2 const repo2 = await promisify(createTempRepo)() bs2 = new Bitswap(mockLibp2pNode(), repo2.blocks) applyNetwork(bs2, n2) - await promisify(bs2.start.bind(bs2))() + await bs2.start() bs1._onPeerConnected(other) bs2._onPeerConnected(me) @@ -354,7 +359,7 @@ describe('bitswap with mocks', function () { it('removes blocks that are wanted multiple times', (done) => { (async () => { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - await promisify(bs.start.bind(bs))() + await bs.start() const b = blocks[12] Promise.all([ diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index f8fb339f..8209715c 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -90,9 +90,11 @@ describe('bitswap stats', () => { }) // start the first bitswap - before((done) => bs.start(done)) + before((done) => { + bs.start().then(() => done()) + }) - after((done) => each(bitswaps, (bs, cb) => bs.stop(cb), done)) + after((done) => each(bitswaps, (bs, cb) => bs.stop().then(() => cb()), done)) after((done) => each(repos, (repo, cb) => repo.teardown(cb), done)) @@ -196,11 +198,11 @@ describe('bitswap stats', () => { before((done) => { bs2 = bitswaps[1] - bs2.start(done) + bs2.start().then(() => done()) }) after((done) => { - bs2.stop(done) + bs2.stop().then(() => done()) }) before((done) => { diff --git a/test/bitswap.js b/test/bitswap.js index f038dfc1..36f9ce04 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -30,7 +30,7 @@ function createThing (dht, callback) { }, (repo, libp2pNode, cb) => { const bitswap = new Bitswap(libp2pNode, repo.blocks) - bitswap.start((err) => cb(err, repo, libp2pNode, bitswap)) + bitswap.start().then((err) => cb(err, repo, libp2pNode, bitswap)) } ], (err, repo, libp2pNode, bitswap) => { expect(err).to.not.exist() @@ -64,7 +64,7 @@ describe('bitswap without DHT', function () { after((done) => { each(nodes, (node, cb) => { series([ - (cb) => node.bitswap.stop(cb), + (cb) => node.bitswap.stop().then(() => cb()), (cb) => node.libp2pNode.stop(cb), (cb) => node.repo.teardown(cb) ], cb) @@ -120,7 +120,7 @@ describe('bitswap with DHT', function () { after((done) => { each(nodes, (node, cb) => { series([ - (cb) => node.bitswap.stop(cb), + (cb) => node.bitswap.stop().then(() => cb()), (cb) => node.libp2pNode.stop(cb), (cb) => node.repo.teardown(cb) ], cb) From 9241589d73fe385b6055cfdf8c2ffbf596a3938f Mon Sep 17 00:00:00 2001 From: nijynot Date: Fri, 19 Apr 2019 19:01:26 +0200 Subject: [PATCH 4/5] refactor: make `connectTo()` async/await --- src/network.js | 17 +++++++++++++---- src/want-manager/msg-queue.js | 15 +++++++-------- test/bitswap-mock-internals.js | 11 +++-------- test/network/network.node.js | 23 +++++++++++++++++------ test/utils/mocks.js | 3 +-- test/wantmanager/msg-queue.spec.js | 3 +-- 6 files changed, 42 insertions(+), 30 deletions(-) diff --git a/src/network.js b/src/network.js index 37ea7cb9..fa8bba44 100644 --- a/src/network.js +++ b/src/network.js @@ -5,6 +5,7 @@ const pull = require('pull-stream') const waterfall = require('async/waterfall') const each = require('async/each') const nextTick = require('async/nextTick') +const promisify = require('promisify-es6') const Message = require('./types/message') const CONSTANTS = require('./constants') @@ -109,7 +110,7 @@ class Network { (cb) => this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, cb), (provs, cb) => { this._log('connecting to providers', provs.map((p) => p.id.toB58String())) - each(provs, (p, cb) => this.connectTo(p, cb)) + each(provs, (p, cb) => this.connectTo(p).then(() => cb())) } ], callback) } @@ -153,10 +154,18 @@ class Network { }) } - connectTo (peer, callback) { - if (!this._running) { return callback(new Error(`network isn't running`)) } + /** + * Connects to another peer + * + * @param {PeerInfo|PeerId|Multiaddr} + * @returns {Promise.} + */ + connectTo (peer) { + if (!this._running) { + throw new Error(`network isn't running`) + } - this.libp2p.dial(peer, callback) + return promisify(this.libp2p.dial.bind(this.libp2p))(peer) } // Dial to the peer and try to use the most recent Bitswap diff --git a/src/want-manager/msg-queue.js b/src/want-manager/msg-queue.js index 62abd05f..40d62643 100644 --- a/src/want-manager/msg-queue.js +++ b/src/want-manager/msg-queue.js @@ -46,19 +46,18 @@ module.exports = class MsgQueue { this.addMessage(msg) } - send (msg) { - this.network.connectTo(this.peerId, (err) => { - if (err) { - this._log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) - return - } - + async send (msg) { + try { + await this.network.connectTo(this.peerId) this._log('sending message') this.network.sendMessage(this.peerId, msg, (err) => { if (err) { this._log.error('send error: %s', err.message) } }) - }) + } catch (err) { + this._log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) + return + } } } diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 5f16f9d0..b827f6e2 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -249,12 +249,10 @@ describe('bitswap with mocks', function () { let bs2 const n1 = { - connectTo (id, cb) { - let err + connectTo (id) { if (id.toHexString() !== other.toHexString()) { - err = new Error('unknown peer') + throw new Error('unknown peer') } - setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === other.toHexString()) { @@ -278,11 +276,9 @@ describe('bitswap with mocks', function () { } const n2 = { connectTo (id, cb) { - let err if (id.toHexString() !== me.toHexString()) { - err = new Error('unkown peer') + throw new Error('unknown peer') } - setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === me.toHexString()) { @@ -305,7 +301,6 @@ describe('bitswap with mocks', function () { } } - // Do not remove semi-colon. Will break the test. ;(async () => { // Create and start bs1 bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) diff --git a/test/network/network.node.js b/test/network/network.node.js index cfea21d9..f1bb613d 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -123,10 +123,15 @@ describe('network', () => { }) it('connectTo fail', (done) => { - networkA.connectTo(p2pB.peerInfo.id, (err) => { - expect(err).to.exist() - done() - }) + (async () => { + try { + await networkA.connectTo(p2pB.peerInfo.id) + chai.assert.fail() + } catch (err) { + expect(err).to.exist() + done() + } + })() }) it('onPeerConnected success', (done) => { @@ -160,7 +165,11 @@ describe('network', () => { }) it('connectTo success', (done) => { - networkA.connectTo(p2pB.peerInfo, done) + networkA.connectTo(p2pB.peerInfo).then(() => { + done() + }).catch((err) => { + expect(err).to.not.exist() + }) }) it('._receiveMessage success from Bitswap 1.0.0', (done) => { @@ -279,7 +288,9 @@ describe('network', () => { function finish () { bitswapMockA._onPeerConnected = () => {} bitswapMockC._onPeerConnected = () => {} - networkA.connectTo(p2pC.peerInfo.id, done) + networkA.connectTo(p2pC.peerInfo.id).then(() => { + done() + }) } }) diff --git a/test/utils/mocks.js b/test/utils/mocks.js index 3f84fedb..e106e8e4 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -62,10 +62,9 @@ exports.mockNetwork = (calls, done) => { } return { - connectTo (p, cb) { + connectTo (p) { setImmediate(() => { connects.push(p) - cb() }) }, sendMessage (p, msg, cb) { diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index 1cb4a612..3c28ca67 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -75,9 +75,8 @@ describe('MessageQueue', () => { } const network = { - connectTo (p, cb) { + connectTo (p) { connects.push(p) - cb() }, sendMessage (p, msg, cb) { messages.push([p, msg]) From c53456b088b379c16b093edc8c1455c86b6fc76d Mon Sep 17 00:00:00 2001 From: nijynot Date: Fri, 19 Apr 2019 20:50:58 +0200 Subject: [PATCH 5/5] refactor: make `findProviders()` and `findAndConnect()` async/await --- src/index.js | 2 +- src/network.js | 44 +++++++++++++++++++++------------- src/want-manager/msg-queue.js | 1 - test/bitswap-mock-internals.js | 8 +++---- test/bitswap.js | 1 - test/utils/mocks.js | 4 ++-- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/src/index.js b/src/index.js index 0fbab810..bb22d92f 100644 --- a/src/index.js +++ b/src/index.js @@ -144,7 +144,7 @@ class Bitswap { } _findAndConnect (cid) { - this.network.findAndConnect(cid, (err) => { + this.network.findAndConnect(cid).catch((err) => { if (err) this._log.error(err) }) } diff --git a/src/network.js b/src/network.js index fa8bba44..95a5acaa 100644 --- a/src/network.js +++ b/src/network.js @@ -2,8 +2,6 @@ const lp = require('pull-length-prefixed') const pull = require('pull-stream') -const waterfall = require('async/waterfall') -const each = require('async/each') const nextTick = require('async/nextTick') const promisify = require('promisify-es6') @@ -98,21 +96,35 @@ class Network { this.bitswap._onPeerDisconnected(peerInfo.id) } - findProviders (cid, maxProviders, callback) { - this.libp2p.contentRouting.findProviders(cid, { - maxTimeout: CONSTANTS.providerRequestTimeout, - maxNumProviders: maxProviders - }, callback) + /** + * Find providers given a `cid`. + * + * @param {CID} cid + * @param {number} maxProviders + * @returns {Promise.>} + */ + findProviders (cid, maxProviders) { + return promisify(this.libp2p.contentRouting.findProviders.bind(this.libp2p.contentRouting))( + cid, + { + maxTimeout: CONSTANTS.providerRequestTimeout, + maxNumProviders: maxProviders + } + ) } - findAndConnect (cid, callback) { - waterfall([ - (cb) => this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, cb), - (provs, cb) => { - this._log('connecting to providers', provs.map((p) => p.id.toB58String())) - each(provs, (p, cb) => this.connectTo(p).then(() => cb())) - } - ], callback) + /** + * Find the providers of a given `cid` and connect to them. + * + * @param {CID} cid + * @returns {void} + */ + async findAndConnect (cid) { + const provs = await this.findProviders(cid, CONSTANTS.maxProvidersPerRequest) + this._log('connecting to providers', provs.map((p) => p.id.toB58String())) + await Promise.all(provs.map((p) => { + return this.connectTo(p) + })) } provide (cid, callback) { @@ -157,7 +169,7 @@ class Network { /** * Connects to another peer * - * @param {PeerInfo|PeerId|Multiaddr} + * @param {PeerInfo|PeerId|Multiaddr} peer * @returns {Promise.} */ connectTo (peer) { diff --git a/src/want-manager/msg-queue.js b/src/want-manager/msg-queue.js index 40d62643..320fb747 100644 --- a/src/want-manager/msg-queue.js +++ b/src/want-manager/msg-queue.js @@ -57,7 +57,6 @@ module.exports = class MsgQueue { }) } catch (err) { this._log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) - return } } } diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index b827f6e2..4a3b2a72 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -267,8 +267,8 @@ describe('bitswap with mocks', function () { stop (callback) { setImmediate(() => callback()) }, - findAndConnect (cid, callback) { - setImmediate(() => callback()) + findAndConnect (cid) { + return new Promise(() => {}) }, provide (cid, callback) { setImmediate(() => callback()) @@ -293,8 +293,8 @@ describe('bitswap with mocks', function () { stop (callback) { setImmediate(() => callback()) }, - findAndConnect (cid, callback) { - setImmediate(() => callback()) + findAndConnect (cid) { + return new Promise(() => {}) }, provide (cid, callback) { setImmediate(() => callback()) diff --git a/test/bitswap.js b/test/bitswap.js index 36f9ce04..99e3a877 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -84,7 +84,6 @@ describe('bitswap without DHT', function () { const block = await promisify(makeBlock)() await nodes[2].bitswap.put(block) - // await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) nodes[0].bitswap.get(block.cid).then((block) => { expect(block).to.not.exist() diff --git a/test/utils/mocks.js b/test/utils/mocks.js index e106e8e4..31c8f3df 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -77,8 +77,8 @@ exports.mockNetwork = (calls, done) => { start (callback) { setImmediate(() => callback()) }, - findAndConnect (cid, callback) { - setImmediate(() => callback()) + findAndConnect (cid) { + return new Promise(() => {}) }, provide (cid, callback) { setImmediate(() => callback())