diff --git a/package.json b/package.json index ab7ceb17..9ce9a6e0 100644 --- a/package.json +++ b/package.json @@ -76,9 +76,11 @@ "moving-average": "^1.0.0", "multicodec": "~0.5.0", "multihashing-async": "~0.5.1", + "promisify-es6": "^1.0.3", "protons": "^1.0.1", "pull-length-prefixed": "^1.3.1", "pull-stream": "^3.6.9", + "typical": "^4.0.0", "varint-decoder": "~0.1.1" }, "pre-push": [ diff --git a/src/index.js b/src/index.js index fddfac23..bb22d92f 100644 --- a/src/index.js +++ b/src/index.js @@ -1,17 +1,16 @@ 'use strict' const waterfall = require('async/waterfall') -const reject = require('async/reject') const each = require('async/each') -const series = require('async/series') -const map = require('async/map') const nextTick = require('async/nextTick') +const promisify = require('promisify-es6') +const typical = require('typical') const WantManager = require('./want-manager') const Network = require('./network') const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') -const logger = require('./utils').logger +const { logger, extendIterator } = require('./utils') const Stats = require('./stats') const defaultOptions = { @@ -112,7 +111,7 @@ class Bitswap { return nextTick(cb) } - this._putBlock(block, cb) + this.put(block).then(() => cb()) } ], callback) } @@ -144,21 +143,9 @@ class Bitswap { this._stats.disconnected(peerId) } - _putBlock (block, callback) { - this.blockstore.put(block, (err) => { - if (err) { - return callback(err) - } - - this.notifications.hasBlock(block) - this.network.provide(block.cid, (err) => { - if (err) { - this._log.error('Failed to provide: %s', err.message) - } - }) - - this.engine.receivedBlocks([block.cid]) - callback() + _findAndConnect (cid) { + this.network.findAndConnect(cid).catch((err) => { + if (err) this._log.error(err) }) } @@ -194,88 +181,72 @@ class Bitswap { * Fetch a given block by cid. If the block is in the local * blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us. * - * @param {CID} cid - * @param {function(Error, Block)} callback - * @returns {void} + * @param {CID} cid - The CID of the block that should be retrieved. + * @param {Object} options + * @param {boolean} options.promptNetwork - Option whether to promptNetwork or not + * @returns {Promise.} - Returns a promise with a block corresponding with the given `cid`. */ - get (cid, callback) { - this.getMany([cid], (err, blocks) => { - if (err) { - return callback(err) - } + async get (cid, options) { + const optionsCopy = Object.assign({}, options) + + optionsCopy.promptNetwork = optionsCopy.promptNetwork || true + + const getFromOutside = (cid) => { + return new Promise((resolve) => { + this.wm.wantBlocks([cid]) + + this.notifications.wantBlock( + cid, + // called on block receive + (block) => { + this.wm.cancelWants([cid]) + resolve(block) + }, + // called on unwant + () => { + this.wm.cancelWants([cid]) + resolve(undefined) + } + ) + }) + } - if (blocks && blocks.length > 0) { - callback(null, blocks[0]) - } else { - // when a unwant happens - callback() + if (await promisify(this.blockstore.has)(cid)) { + return promisify(this.blockstore.get)(cid) + } else { + if (optionsCopy.promptNetwork) { + this._findAndConnect(cid) } - }) + return getFromOutside(cid) + } } /** * Fetch a a list of blocks by cid. If the blocks are in the local * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. * - * @param {Array} cids - * @param {function(Error, Blocks)} callback - * @returns {void} + * @param {Iterable.} cids + * @returns {Iterable.>} */ - getMany (cids, callback) { - let pendingStart = cids.length - const wantList = [] - let promptedNetwork = false - - const getFromOutside = (cid, cb) => { - wantList.push(cid) - - this.notifications.wantBlock( - cid, - // called on block receive - (block) => { - this.wm.cancelWants([cid]) - cb(null, block) - }, - // called on unwant - () => { - this.wm.cancelWants([cid]) - cb(null, undefined) - } - ) + getMany (cids) { + if (!typical.isIterable(cids) || typical.isString(cids) || + Buffer.isBuffer(cids)) { + throw new Error('`cids` must be an iterable of CIDs') + } - if (!pendingStart) { - this.wm.wantBlocks(wantList) + const generator = async function * () { + let promptNetwork = true + for await (const cid of cids) { + if (promptNetwork) { + yield this.get(cid, { promptNetwork: true }) + promptNetwork = false + } else { + yield this.get(cid, { promptNetwork: false }) + } } - } + }.bind(this) - map(cids, (cid, cb) => { - waterfall( - [ - (cb) => this.blockstore.has(cid, cb), - (has, cb) => { - pendingStart-- - if (has) { - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - return this.blockstore.get(cid, cb) - } - - if (!promptedNetwork) { - promptedNetwork = true - this.network.findAndConnect(cids[0], (err) => { - if (err) { - this._log.error(err) - } - }) - } - - // we don't have the block here - getFromOutside(cid, cb) - } - ], - cb) - }, callback) + return extendIterator(generator()) } // removes the given cids from the wantlist independent of any ref counts @@ -300,61 +271,53 @@ class Bitswap { * Put the given block to the underlying blockstore and * send it to nodes that have it in their wantlist. * - * @param {Block} block - * @param {function(Error)} callback - * @returns {void} + * @param {Block} block - Block that should be inserted. + * @returns {Promise.} - Returns the CID of the serialized IPLD Nodes. */ - put (block, callback) { + async put (block) { this._log('putting block') - waterfall([ - (cb) => this.blockstore.has(block.cid, cb), - (has, cb) => { - if (has) { - return nextTick(cb) - } + const has = await promisify(this.blockstore.has)(block.cid) + if (!has) { + await promisify(this.blockstore.put)(block) + this.notifications.hasBlock(block) + this.network.provide(block.cid, (err) => { + if (err) this._log.error('Failed to provide: %s', err.message) + }) + this.engine.receivedBlocks([block.cid]) + } - this._putBlock(block, cb) - } - ], callback) + return block.cid } /** * Put the given blocks to the underlying blockstore and * send it to nodes that have it them their wantlist. * - * @param {Array} blocks - * @param {function(Error)} callback - * @returns {void} + * @param {Iterable.} blocks + * @returns {Iterable.>} - Returns an async iterator with the CIDs of the blocks inserted */ - putMany (blocks, callback) { - waterfall([ - (cb) => reject(blocks, (b, cb) => { - this.blockstore.has(b.cid, cb) - }, cb), - (newBlocks, cb) => this.blockstore.putMany(newBlocks, (err) => { - if (err) { - return cb(err) + putMany (blocks) { + if (!typical.isIterable(blocks)) { + throw new Error('`blocks` must be an iterable') + } + + const generator = async function * () { + for await (const block of blocks) { + const has = await promisify(this.blockstore.has)(block.cid) + if (!has) { + yield this.put(block) } + } + }.bind(this) - newBlocks.forEach((block) => { - this.notifications.hasBlock(block) - this.engine.receivedBlocks([block.cid]) - this.network.provide(block.cid, (err) => { - if (err) { - this._log.error('Failed to provide: %s', err.message) - } - }) - }) - cb() - }) - ], callback) + return extendIterator(generator()) } /** * Get the current list of wants. * - * @returns {Iterator} + * @returns {Iterator.} */ getWantlist () { return this.wm.wantlist.entries() @@ -363,7 +326,7 @@ class Bitswap { /** * Get the current list of partners. * - * @returns {Array} + * @returns {Array.} */ peers () { return this.engine.peers() @@ -381,32 +344,25 @@ class Bitswap { /** * Start the bitswap node. * - * @param {function(Error)} callback - * - * @returns {void} + * @returns {Promise} */ - start (callback) { - series([ - (cb) => this.wm.start(cb), - (cb) => this.network.start(cb), - (cb) => this.engine.start(cb) - ], callback) + async start () { + await promisify(this.wm.start.bind(this.wm))() + await promisify(this.network.start.bind(this.network))() + await promisify(this.engine.start.bind(this.engine))() } /** * Stop the bitswap node. * - * @param {function(Error)} callback - * - * @returns {void} + * @returns {Promise} */ - stop (callback) { + async stop () { this._stats.stop() - series([ - (cb) => this.wm.stop(cb), - (cb) => this.network.stop(cb), - (cb) => this.engine.stop(cb) - ], callback) + + await promisify(this.wm.stop.bind(this.wm))() + await promisify(this.network.stop.bind(this.network))() + await promisify(this.engine.stop.bind(this.engine))() } } diff --git a/src/network.js b/src/network.js index 37ea7cb9..95a5acaa 100644 --- a/src/network.js +++ b/src/network.js @@ -2,9 +2,8 @@ const lp = require('pull-length-prefixed') const pull = require('pull-stream') -const waterfall = require('async/waterfall') -const each = require('async/each') const nextTick = require('async/nextTick') +const promisify = require('promisify-es6') const Message = require('./types/message') const CONSTANTS = require('./constants') @@ -97,21 +96,35 @@ class Network { this.bitswap._onPeerDisconnected(peerInfo.id) } - findProviders (cid, maxProviders, callback) { - this.libp2p.contentRouting.findProviders(cid, { - maxTimeout: CONSTANTS.providerRequestTimeout, - maxNumProviders: maxProviders - }, callback) + /** + * Find providers given a `cid`. + * + * @param {CID} cid + * @param {number} maxProviders + * @returns {Promise.>} + */ + findProviders (cid, maxProviders) { + return promisify(this.libp2p.contentRouting.findProviders.bind(this.libp2p.contentRouting))( + cid, + { + maxTimeout: CONSTANTS.providerRequestTimeout, + maxNumProviders: maxProviders + } + ) } - findAndConnect (cid, callback) { - waterfall([ - (cb) => this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, cb), - (provs, cb) => { - this._log('connecting to providers', provs.map((p) => p.id.toB58String())) - each(provs, (p, cb) => this.connectTo(p, cb)) - } - ], callback) + /** + * Find the providers of a given `cid` and connect to them. + * + * @param {CID} cid + * @returns {void} + */ + async findAndConnect (cid) { + const provs = await this.findProviders(cid, CONSTANTS.maxProvidersPerRequest) + this._log('connecting to providers', provs.map((p) => p.id.toB58String())) + await Promise.all(provs.map((p) => { + return this.connectTo(p) + })) } provide (cid, callback) { @@ -153,10 +166,18 @@ class Network { }) } - connectTo (peer, callback) { - if (!this._running) { return callback(new Error(`network isn't running`)) } + /** + * Connects to another peer + * + * @param {PeerInfo|PeerId|Multiaddr} peer + * @returns {Promise.} + */ + connectTo (peer) { + if (!this._running) { + throw new Error(`network isn't running`) + } - this.libp2p.dial(peer, callback) + return promisify(this.libp2p.dial.bind(this.libp2p))(peer) } // Dial to the peer and try to use the most recent Bitswap diff --git a/src/utils.js b/src/utils.js index 25f6bf32..34abe623 100644 --- a/src/utils.js +++ b/src/utils.js @@ -80,11 +80,41 @@ const sortBy = (fn, list) => { }) } +const first = async (iterator) => { + for await (const value of iterator) { + return value + } +} + +const last = async (iterator) => { + let value + for await (value of iterator) { + // Intentionally empty + } + return value +} + +const all = async (iterator) => { + const values = [] + for await (const value of iterator) { + values.push(value) + } + return values +} + +const extendIterator = (iterator) => { + iterator.first = () => first(iterator) + iterator.last = () => last(iterator) + iterator.all = () => all(iterator) + return iterator +} + module.exports = { logger, includesWith, uniqWith, groupBy, pullAllWith, - sortBy + sortBy, + extendIterator } diff --git a/src/want-manager/msg-queue.js b/src/want-manager/msg-queue.js index 62abd05f..320fb747 100644 --- a/src/want-manager/msg-queue.js +++ b/src/want-manager/msg-queue.js @@ -46,19 +46,17 @@ module.exports = class MsgQueue { this.addMessage(msg) } - send (msg) { - this.network.connectTo(this.peerId, (err) => { - if (err) { - this._log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) - return - } - + async send (msg) { + try { + await this.network.connectTo(this.peerId) this._log('sending message') this.network.sendMessage(this.peerId, msg, (err) => { if (err) { this._log.error('send error: %s', err.message) } }) - }) + } catch (err) { + this._log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) + } } } diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 24c39d92..4a3b2a72 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -3,7 +3,6 @@ 'use strict' const eachSeries = require('async/eachSeries') -const waterfall = require('async/waterfall') const map = require('async/map') const parallel = require('async/parallel') const setImmediate = require('async/setImmediate') @@ -12,6 +11,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const PeerId = require('peer-id') +const promisify = require('promisify-es6') const Message = require('../src/types/message') const Bitswap = require('../src') @@ -55,9 +55,11 @@ describe('bitswap with mocks', function () { describe('receive message', () => { it('simple block message', (done) => { - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.start((err) => { - expect(err).to.not.exist() + (async () => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await bs.start().catch((err) => { + expect(err).to.not.exist() + }) const other = ids[1] @@ -86,13 +88,16 @@ describe('bitswap with mocks', function () { done() }) }) - }) + })() }) it('simple want message', (done) => { - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.start((err) => { - expect(err).to.not.exist() + (async () => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await bs.start().catch((err) => { + expect(err).to.not.exist() + }) + const other = ids[1] const b1 = blocks[0] const b2 = blocks[1] @@ -112,7 +117,7 @@ describe('bitswap with mocks', function () { done() }) - }) + })() }) it('multi peer', function (done) { @@ -122,8 +127,8 @@ describe('bitswap with mocks', function () { let others let blocks - bs.start((err) => { - expect(err).to.not.exist() + (async () => { + await bs.start() parallel([ (cb) => map(_.range(5), (i, cb) => PeerId.create({ bits: 512 }, cb), cb), @@ -155,84 +160,76 @@ describe('bitswap with mocks', function () { }, done) }) } - }) + })() }) }) describe('get', () => { - it('fails on requesting empty block', (done) => { + it('fails on requesting empty block', async () => { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.get(null, (err, res) => { + try { + await bs.get(null) + } catch (err) { expect(err).to.exist() expect(err.message).to.equal('Not a valid cid') - done() - }) + } }) - it('block exists locally', (done) => { + it('block exists locally', async () => { const block = blocks[4] + await promisify(repo.blocks.put)(block) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - repo.blocks.put(block, (err) => { - expect(err).to.not.exist() - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - - bs.get(block.cid, (err, res) => { - expect(err).to.not.exist() - expect(res).to.eql(block) - done() - }) - }) + const retrievedBlock = await bs.get(block.cid) + expect(retrievedBlock).to.eql(block) }) - it('blocks exist locally', (done) => { + it('blocks exist locally', async () => { const b1 = blocks[3] const b2 = blocks[14] const b3 = blocks[13] - repo.blocks.putMany([b1, b2, b3], (err) => { - expect(err).to.not.exist() - - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await promisify(repo.blocks.putMany)([b1, b2, b3]) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.getMany([b1.cid, b2.cid, b3.cid], (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([b1, b2, b3]) - done() - }) - }) + const gen = bs.getMany([b1.cid, b2.cid, b3.cid]) + const retrievedBlocks = [] + for await (const block of gen) { + retrievedBlocks.push(block) + } + expect(retrievedBlocks).to.be.eql([b1, b2, b3]) }) - it('getMany', (done) => { + it('getMany', async () => { const b1 = blocks[5] const b2 = blocks[6] const b3 = blocks[7] - repo.blocks.putMany([b1, b2, b3], (err) => { - expect(err).to.not.exist() + await promisify(repo.blocks.putMany)([b1, b2, b3]) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const block1 = await bs.get(b1.cid) + expect(block1).to.eql(b1) - map([b1.cid, b2.cid, b3.cid], (cid, cb) => bs.get(cid, cb), (err, res) => { - expect(err).to.not.exist() - expect(res).to.eql([b1, b2, b3]) - done() - }) - }) + const block2 = await bs.get(b2.cid) + expect(block2).to.eql(b2) + + const block3 = await bs.get(b3.cid) + expect(block3).to.eql(b3) }) it('block is added locally afterwards', (done) => { - const finish = orderedFinish(2, done) - const block = blocks[9] - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - const net = mockNetwork() + (async () => { + const finish = orderedFinish(2, done) + const block = blocks[9] + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + const net = mockNetwork() - bs.network = net - bs.wm.network = net - bs.engine.network = net - bs.start((err) => { - expect(err).to.not.exist() - bs.get(block.cid, (err, res) => { - expect(err).to.not.exist() + bs.network = net + bs.wm.network = net + bs.engine.network = net + await bs.start() + bs.get(block.cid).then((res) => { expect(res).to.eql(block) finish(2) }) @@ -241,7 +238,7 @@ describe('bitswap with mocks', function () { finish(1) bs.put(block, () => {}) }, 200) - }) + })() }) it('block is sent after local add', (done) => { @@ -252,12 +249,10 @@ describe('bitswap with mocks', function () { let bs2 const n1 = { - connectTo (id, cb) { - let err + connectTo (id) { if (id.toHexString() !== other.toHexString()) { - err = new Error('unknown peer') + throw new Error('unknown peer') } - setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === other.toHexString()) { @@ -272,8 +267,8 @@ describe('bitswap with mocks', function () { stop (callback) { setImmediate(() => callback()) }, - findAndConnect (cid, callback) { - setImmediate(() => callback()) + findAndConnect (cid) { + return new Promise(() => {}) }, provide (cid, callback) { setImmediate(() => callback()) @@ -281,11 +276,9 @@ describe('bitswap with mocks', function () { } const n2 = { connectTo (id, cb) { - let err if (id.toHexString() !== me.toHexString()) { - err = new Error('unkown peer') + throw new Error('unknown peer') } - setImmediate(() => cb(err)) }, sendMessage (id, msg, cb) { if (id.toHexString() === me.toHexString()) { @@ -300,46 +293,39 @@ describe('bitswap with mocks', function () { stop (callback) { setImmediate(() => callback()) }, - findAndConnect (cid, callback) { - setImmediate(() => callback()) + findAndConnect (cid) { + return new Promise(() => {}) }, provide (cid, callback) { setImmediate(() => callback()) } } - bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) - applyNetwork(bs1, n1) - - bs1.start((err) => { - expect(err).to.not.exist() - let repo2 + ;(async () => { + // Create and start bs1 + bs1 = new Bitswap(mockLibp2pNode(), repo.blocks) + applyNetwork(bs1, n1) + await bs1.start() - waterfall([ - (cb) => createTempRepo(cb), - (repo, cb) => { - repo2 = repo - bs2 = new Bitswap(mockLibp2pNode(), repo2.blocks) - applyNetwork(bs2, n2) - bs2.start((err) => { - expect(err).to.not.exist() + // Create and start bs2 + const repo2 = await promisify(createTempRepo)() + bs2 = new Bitswap(mockLibp2pNode(), repo2.blocks) + applyNetwork(bs2, n2) + await bs2.start() - bs1._onPeerConnected(other) - bs2._onPeerConnected(me) + bs1._onPeerConnected(other) + bs2._onPeerConnected(me) - bs1.get(block.cid, (err, res) => { - expect(err).to.not.exist() - cb(null, res) - }) - setTimeout(() => bs2.put(block, () => {}), 1000) - }) - }, - (res, cb) => { - expect(res).to.eql(block) - cb() - } - ], done) - }) + bs1.get(block.cid).then((res) => { + expect(res).to.eql(block) + done() + }).catch((err) => { + expect(err).to.not.exist() + }) + setTimeout(() => { + bs2.put(block, () => {}) + }, 1000) + })() }) it('double get', (done) => { @@ -347,18 +333,16 @@ describe('bitswap with mocks', function () { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - parallel( - [ - (cb) => bs.get(block.cid, cb), - (cb) => bs.get(block.cid, cb) - ], - (err, res) => { - expect(err).to.not.exist() - expect(res[0]).to.eql(block) - expect(res[1]).to.eql(block) - done() - } - ) + Promise.all([ + bs.get(block.cid), + bs.get(block.cid) + ]).then((res) => { + expect(res[0]).to.eql(block) + expect(res[1]).to.eql(block) + done() + }).catch((err) => { + expect(err).to.not.exist() + }) bs.put(block, (err) => { expect(err).to.not.exist() @@ -368,24 +352,23 @@ describe('bitswap with mocks', function () { describe('unwant', () => { it('removes blocks that are wanted multiple times', (done) => { - const bs = new Bitswap(mockLibp2pNode(), repo.blocks) - bs.start((err) => { - expect(err).to.not.exist() - const b = blocks[12] - - let counter = 0 - const check = (err, res) => { - expect(err).to.not.exist() - expect(res).to.not.exist() - - if (++counter === 2) { done() } - } + (async () => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + await bs.start() - bs.get(b.cid, check) - bs.get(b.cid, check) + const b = blocks[12] + Promise.all([ + bs.get(b.cid), + bs.get(b.cid) + ]).then((res) => { + expect(res[1]).to.not.exist() + done() + }).catch((e) => { + expect(e).to.not.exist() + }) setTimeout(() => bs.unwant(b.cid), 10) - }) + })() }) }) diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index 73c27a48..8209715c 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -90,9 +90,11 @@ describe('bitswap stats', () => { }) // start the first bitswap - before((done) => bs.start(done)) + before((done) => { + bs.start().then(() => done()) + }) - after((done) => each(bitswaps, (bs, cb) => bs.stop(cb), done)) + after((done) => each(bitswaps, (bs, cb) => bs.stop().then(() => cb()), done)) after((done) => each(repos, (repo, cb) => repo.teardown(cb), done)) @@ -196,11 +198,11 @@ describe('bitswap stats', () => { before((done) => { bs2 = bitswaps[1] - bs2.start(done) + bs2.start().then(() => done()) }) after((done) => { - bs2.stop(done) + bs2.stop().then(() => done()) }) before((done) => { @@ -212,8 +214,8 @@ describe('bitswap stats', () => { }) }) - before((done) => { - bs.put(block, done) + before(async () => { + await bs.put(block) }) it('updates stats on transfer', (done) => { @@ -231,10 +233,11 @@ describe('bitswap stats', () => { finish() }) - bs2.get(block.cid, (err, block) => { - expect(err).to.not.exist() + bs2.get(block.cid).then(() => { expect(block).to.exist() finish() + }).catch((err) => { + expect(err).to.not.exist() }) }) diff --git a/test/bitswap.js b/test/bitswap.js index ec1b52f2..99e3a877 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -6,6 +6,7 @@ const waterfall = require('async/waterfall') const series = require('async/series') const each = require('async/each') const parallel = require('async/parallel') +const promisify = require('promisify-es6') const chai = require('chai') chai.use(require('dirty-chai')) @@ -29,7 +30,7 @@ function createThing (dht, callback) { }, (repo, libp2pNode, cb) => { const bitswap = new Bitswap(libp2pNode, repo.blocks) - bitswap.start((err) => cb(err, repo, libp2pNode, bitswap)) + bitswap.start().then((err) => cb(err, repo, libp2pNode, bitswap)) } ], (err, repo, libp2pNode, bitswap) => { expect(err).to.not.exist() @@ -63,7 +64,7 @@ describe('bitswap without DHT', function () { after((done) => { each(nodes, (node, cb) => { series([ - (cb) => node.bitswap.stop(cb), + (cb) => node.bitswap.stop().then(() => cb()), (cb) => node.libp2pNode.stop(cb), (cb) => node.repo.teardown(cb) ], cb) @@ -78,15 +79,13 @@ describe('bitswap without DHT', function () { }) it('put a block in 2, fail to get it in 0', (done) => { - const finish = orderedFinish(2, done) + (async () => { + const finish = orderedFinish(2, done) - waterfall([ - (cb) => makeBlock(cb), - (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)) - ], (err, block) => { - expect(err).to.not.exist() - nodes[0].bitswap.get(block.cid, (err, block) => { - expect(err).to.not.exist() + const block = await promisify(makeBlock)() + await nodes[2].bitswap.put(block) + + nodes[0].bitswap.get(block.cid).then((block) => { expect(block).to.not.exist() finish(2) }) @@ -95,7 +94,7 @@ describe('bitswap without DHT', function () { finish(1) nodes[0].bitswap.unwant(block.cid) }, 200) - }) + })() }) }) @@ -120,7 +119,7 @@ describe('bitswap with DHT', function () { after((done) => { each(nodes, (node, cb) => { series([ - (cb) => node.bitswap.stop(cb), + (cb) => node.bitswap.stop().then(() => cb()), (cb) => node.libp2pNode.stop(cb), (cb) => node.repo.teardown(cb) ], cb) @@ -134,16 +133,13 @@ describe('bitswap with DHT', function () { ], done) }) - it('put a block in 2, get it in 0', function (done) { - waterfall([ - (cb) => makeBlock(cb), - (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)), - (block, cb) => nodes[0].bitswap.get(block.cid, (err, blockRetrieved) => { - expect(err).to.not.exist() - expect(block.data).to.eql(blockRetrieved.data) - expect(block.cid).to.eql(blockRetrieved.cid) - cb() - }) - ], done) + it('put a block in 2, get it in 0', async () => { + const block = await promisify(makeBlock)() + nodes[2].bitswap.put(block) + // await promisify(nodes[2].bitswap.put.bind(nodes[2].bitswap))(block) + + const blockRetrieved = await nodes[0].bitswap.get(block.cid) + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) }) }) diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 2cde6492..6cc8685b 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -46,13 +46,12 @@ describe('gen Bitswap network', function () { }), (cb) => each( blocks, - (b, cb) => node.bitswap.put(b, cb), + (b, cb) => node.bitswap.put(b).then(() => cb()), cb ), - (cb) => map(_.range(100), (i, cb) => { - node.bitswap.get(blocks[i].cid, cb) - }, (err, res) => { - expect(err).to.not.exist() + (cb) => Promise.all(_.range(100).map((i) => { + return node.bitswap.get(blocks[i].cid) + })).then((res) => { expect(res).to.have.length(blocks.length) cb() }) @@ -115,23 +114,25 @@ function round (nodeArr, n, cb) { return blocks[index] }) - each(data, (d, cb) => node.bitswap.put(d, cb), cb) + each(data, (d, cb) => node.bitswap.put(d).then(() => cb()), cb) }), cb), (cb) => { d = (new Date()).getTime() - // fetch all blocks on every node - parallel(_.map(nodeArr, (node, i) => (cb) => { - map(cids, (cid, cb) => { - node.bitswap.get(cid, cb) - }, (err, res) => { - if (err) { - return cb(err) - } - expect(res).to.have.length(blocks.length) - cb() - }) - }), cb) + // fetch all blocks on every node + Promise.all(nodeArr.map((node) => { + return Promise.all( + cids.map((cid) => { + return node.bitswap.get(cid) + }) + ) + })).then((res) => { + expect(res[0]).to.deep.equal(blocks) + expect(res[1]).to.deep.equal(blocks) + cb() + }).catch((err) => { + cb(err) + }) } ], (err) => { if (err) { diff --git a/test/network/network.node.js b/test/network/network.node.js index cfea21d9..f1bb613d 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -123,10 +123,15 @@ describe('network', () => { }) it('connectTo fail', (done) => { - networkA.connectTo(p2pB.peerInfo.id, (err) => { - expect(err).to.exist() - done() - }) + (async () => { + try { + await networkA.connectTo(p2pB.peerInfo.id) + chai.assert.fail() + } catch (err) { + expect(err).to.exist() + done() + } + })() }) it('onPeerConnected success', (done) => { @@ -160,7 +165,11 @@ describe('network', () => { }) it('connectTo success', (done) => { - networkA.connectTo(p2pB.peerInfo, done) + networkA.connectTo(p2pB.peerInfo).then(() => { + done() + }).catch((err) => { + expect(err).to.not.exist() + }) }) it('._receiveMessage success from Bitswap 1.0.0', (done) => { @@ -279,7 +288,9 @@ describe('network', () => { function finish () { bitswapMockA._onPeerConnected = () => {} bitswapMockC._onPeerConnected = () => {} - networkA.connectTo(p2pC.peerInfo.id, done) + networkA.connectTo(p2pC.peerInfo.id).then(() => { + done() + }) } }) diff --git a/test/utils/mocks.js b/test/utils/mocks.js index 3f84fedb..31c8f3df 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -62,10 +62,9 @@ exports.mockNetwork = (calls, done) => { } return { - connectTo (p, cb) { + connectTo (p) { setImmediate(() => { connects.push(p) - cb() }) }, sendMessage (p, msg, cb) { @@ -78,8 +77,8 @@ exports.mockNetwork = (calls, done) => { start (callback) { setImmediate(() => callback()) }, - findAndConnect (cid, callback) { - setImmediate(() => callback()) + findAndConnect (cid) { + return new Promise(() => {}) }, provide (cid, callback) { setImmediate(() => callback()) diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index 1cb4a612..3c28ca67 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -75,9 +75,8 @@ describe('MessageQueue', () => { } const network = { - connectTo (p, cb) { + connectTo (p) { connects.push(p) - cb() }, sendMessage (p, msg, cb) { messages.push([p, msg])