From c65e7221f84c1c8c287ece136160d025c3af1ad4 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Tue, 20 Dec 2016 12:00:08 +0100 Subject: [PATCH] feat: more perf stuffs --- README.md | 18 +-- package.json | 9 +- src/components/decision-engine/index.js | 74 ++++----- src/index.js | 143 ++++++++---------- src/types/message/index.js | 17 +-- test/components/decision-engine/index-test.js | 2 +- test/index-test.js | 11 +- test/types/message.spec.js | 26 ++-- 8 files changed, 130 insertions(+), 170 deletions(-) diff --git a/README.md b/README.md index 640af1f4..f0288d21 100644 --- a/README.md +++ b/README.md @@ -104,24 +104,24 @@ pull( - `cids: CID|[]CID` -Cancel previously requested keys, forcefully. That means they are removed from the -wantlist independent of how many other resources requested these keys. Callbacks -attached to `getBlock` are errored with `Error('manual unwant: key')`. +Cancel previously requested cids, forcefully. That means they are removed from the +wantlist independent of how many other resources requested these cids. Callbacks +attached to `getBlock` are errored with `Error('manual unwant: cid)`. #### `cancelWants(cids)` - `cid: CID|[]CID` -Cancel previously requested keys. +Cancel previously requested cids. #### `putStream()` -Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored. -Objects passed into here should be of the form `{data: Buffer, key: Multihash}` +Returns a duplex `pull-stream` that emits an object `{cid: CID}` for every written block when it was stored. +Objects passed into here should be of the form `{data: Buffer, cid: CID}` #### `put(blockAndCid, callback)` -- `blockAndKey: {data: Buffer, cid: CID}` +- `blockAndCid: {data: Buffer, cid: CID}` - `callback: Function` Announce that the current node now has the block containing `data`. This will store it @@ -152,9 +152,7 @@ src │   ├── decision │   │   ├── engine.js │   │   ├── index.js -│   │   ├── ledger.js -│   │   ├── peer-request-queue.js -│   │   └── pq.js +│   │   └── ledger.js │   ├── network # Handles peerSet and open new conns │   │   └── index.js │   └── want-manager # Keeps track of all blocks the peer wants (not the others which it is connected) diff --git a/package.json b/package.json index c68386b8..e9fb3ef8 100644 --- a/package.json +++ b/package.json @@ -44,12 +44,12 @@ "idb-pull-blob-store": "^0.5.1", "interface-pull-blob-store": "^0.6.0", "ipfs-repo": "^0.11.2", - "libp2p-ipfs-nodejs": "^0.17.0", + "libp2p-ipfs-nodejs": "^0.17.1", "lodash": "^4.17.2", "multiaddr": "^2.1.1", "ncp": "^2.0.0", "peer-book": "^0.3.0", - "peer-id": "^0.8.0", + "peer-id": "^0.8.1", "peer-info": "^0.8.1", "rimraf": "^2.5.4", "safe-buffer": "^5.0.1" @@ -57,9 +57,9 @@ "dependencies": { "async": "^2.1.4", "cids": "^0.3.5", - "debug": "^2.4.4", + "debug": "^2.5.1", "heap": "^0.2.6", - "ipfs-block": "^0.5.3", + "ipfs-block": "^0.5.4", "lodash.debounce": "^4.0.8", "lodash.find": "^4.6.0", "lodash.groupby": "^4.6.0", @@ -68,7 +68,6 @@ "lodash.pullallwith": "^4.7.0", "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", - "multihashes": "^0.3.1", "protocol-buffers": "^3.2.1", "pull-defer": "^0.2.2", "pull-length-prefixed": "^1.2.0", diff --git a/src/components/decision-engine/index.js b/src/components/decision-engine/index.js index 5617a6ee..ed42f319 100644 --- a/src/components/decision-engine/index.js +++ b/src/components/decision-engine/index.js @@ -11,7 +11,6 @@ const find = require('lodash.find') const values = require('lodash.values') const groupBy = require('lodash.groupby') const pullAllWith = require('lodash.pullallwith') -const CID = require('cids') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') @@ -124,18 +123,17 @@ class DecisionEngine { return } // Check all connected peers if they want the block we received - for (let l of this.ledgerMap.values()) { + this.ledgerMap.forEach((ledger) => { cids - .map((k) => l.wantlistContains(k)) + .map((cid) => ledger.wantlistContains(cid)) .filter(Boolean) - .forEach((e) => { - // this.peerRequestQueue.push(e, l.partner) + .forEach((entry) => { this._tasks.push({ - entry: e, - target: l.partner + entry: entry, + target: ledger.partner }) }) - } + }) this._outbox() } @@ -152,30 +150,26 @@ class DecisionEngine { ledger.wantlist = new Wantlist() } - this._processBlocks(msg.blocks, ledger, (err) => { - if (err) { - log.error(`failed to process blocks: ${err.message}`) - } + this._processBlocks(msg.blocks, ledger) - if (msg.wantlist.size === 0) { - return cb() - } + if (msg.wantlist.size === 0) { + return cb() + } - let cancels = [] - let wants = [] - for (let entry of msg.wantlist.values()) { - if (entry.cancel) { - ledger.cancelWant(entry.cid) - cancels.push(entry) - } else { - ledger.wants(entry.cid, entry.priority) - wants.push(entry) - } + let cancels = [] + let wants = [] + msg.wantlist.forEach((entry) => { + if (entry.cancel) { + ledger.cancelWant(entry.cid) + cancels.push(entry) + } else { + ledger.wants(entry.cid, entry.priority) + wants.push(entry) } - - this._cancelWants(ledger, peerId, cancels) - this._addWants(ledger, peerId, wants, cb) }) + + this._cancelWants(ledger, peerId, cancels) + this._addWants(ledger, peerId, wants, cb) } _cancelWants (ledger, peerId, entries) { @@ -209,24 +203,14 @@ class DecisionEngine { } _processBlocks (blocks, ledger, callback) { - map(blocks.values(), (block, cb) => { - block.key((err, key) => { - if (err) { - return cb(err) - } - log('got block (%s bytes)', block.data.length) - ledger.receivedBytes(block.data.length) - - cb(null, new CID(key)) - }) - }, (err, cids) => { - if (err) { - return callback(err) - } - - this.receivedBlocks(cids) - callback() + const cids = [] + blocks.forEach((b, cidStr) => { + log('got block (%s bytes)', b.block.data.length) + ledger.receivedBytes(b.block.data.length) + cids.push(b.cid) }) + + this.receivedBlocks(cids) } // Clear up all accounting things after message was sent diff --git a/src/index.js b/src/index.js index e3831546..87c9b6f8 100644 --- a/src/index.js +++ b/src/index.js @@ -1,21 +1,21 @@ 'use strict' -const series = require('async/series') -const debug = require('debug') - -const log = debug('bitswap') -log.error = debug('bitswap:error') +const waterfall = require('async/waterfall') +const each = require('async/each') const EventEmitter = require('events').EventEmitter const pull = require('pull-stream') const paramap = require('pull-paramap') const defer = require('pull-defer/source') -const CID = require('cids') +const debug = require('debug') const CONSTANTS = require('./constants') const WantManager = require('./components/want-manager') const Network = require('./components/network') const DecisionEngine = require('./components/decision-engine') +const log = debug('bitswap') +log.error = debug('bitswap:error') + class Bitswap { constructor (libp2p, blockstore, peerBook) { this.libp2p = libp2p @@ -46,83 +46,53 @@ class Bitswap { log('failed to receive message', incoming) } - const cidsAndBlocks = Array - .from(incoming.blocks.entries()) - .map((entry) => { - return { cid: new CID(entry[0]), block: entry[1] } - }) - - if (cidsAndBlocks.length === 0) { + if (incoming.blocks.size === 0) { return cb() } + const cidsAndBlocks = Array.from(incoming.blocks.values()) + // quickly send out cancels, reduces chances of duplicate block receives - pull( - pull.values(cidsAndBlocks), - pull.filter((cidAndBlock) => this.wm.wantlist.contains(cidAndBlock.cid)), - pull.collect((err, cidsAndBlocks) => { - if (err) { - return log.error(err) - } - const cids = cidsAndBlocks.map((entry) => entry.cid) + const toCancel = cidsAndBlocks + .filter((b) => this.wm.wantlist.contains(b.cid)) + .map((b) => b.cid) - this.wm.cancelWants(cids) - }) - ) + this.wm.cancelWants(toCancel) - pull( - pull.values(cidsAndBlocks), - paramap(this._handleReceivedBlock.bind(this, peerId), 10), - pull.onEnd(cb) + each( + cidsAndBlocks, + this._handleReceivedBlock.bind(this, peerId), + cb ) }) } _handleReceivedBlock (peerId, cidAndBlock, callback) { - series([ - (cb) => this._updateReceiveCounters(cidAndBlock.block, (err) => { - if (err) { - // ignore, as these have been handled - // in _updateReceiveCounters + const cid = cidAndBlock.cid + const block = cidAndBlock.block + + waterfall([ + (cb) => this.blockstore.has(cid.multihash, cb), + (exists, cb) => { + this._updateReceiveCounters(block, exists) + log('got block') + + if (exists) { return cb() } - log('got block from %s', peerId.toB58String(), cidAndBlock.block.data.length) - cb() - }), - (cb) => { - this.put(cidAndBlock, (err) => { - if (err) { - log.error('receiveMessage put error: %s', err.message) - } - cb() - }) + this._putBlockStore(cidAndBlock, cb) } ], callback) } - _updateReceiveCounters (block, callback) { + _updateReceiveCounters (block, exists) { this.blocksRecvd++ - block.key((err, key) => { - if (err) { - return callback(err) - } - this.blockstore.has(key, (err, has) => { - if (err) { - log('blockstore.has error: %s', err.message) - return callback(err) - } - - if (has) { - this.dupBlocksRecvd ++ - this.dupDataRecvd += block.data.length - return callback(new Error('Already have block')) - } - - callback() - }) - }) + if (exists) { + this.dupBlocksRecvd ++ + this.dupDataRecvd += block.data.length + } } // handle errors on the receiving channel @@ -250,24 +220,35 @@ class Bitswap { }) }), pull.filter((val) => !val[1]), - pull.map((val) => { - const block = val[0].block - const cid = val[0].cid - log('putting block') - return pull( - pull.values([{ - data: block.data, - key: cid.multihash - }]), - this.blockstore.putStream(), - pull.through(() => { - log('put block') - this.notifications.emit(`block:${cid.buffer.toString()}`, block) - this.engine.receivedBlocks([cid]) - }) - ) - }), - pull.flatten() + pull.asyncMap((val, cb) => { + this._putBlockStore(val[0], cb) + }) + ) + } + + _putBlockStore (blockAndCid, callback) { + const block = blockAndCid.block + const cid = blockAndCid.cid + const cidStr = cid.buffer.toString() + + log('putting block') + + pull( + pull.values([{ + data: block.data, + key: cid.multihash + }]), + this.blockstore.putStream(), + pull.collect((err, meta) => { + if (err) { + return callback(err) + } + + log('put block') + this.notifications.emit(`block:${cidStr}`, block) + this.engine.receivedBlocks([cid]) + callback(null, meta) + }) ) } diff --git a/src/types/message/index.js b/src/types/message/index.js index 5e9f14c3..fcb94883 100644 --- a/src/types/message/index.js +++ b/src/types/message/index.js @@ -26,7 +26,7 @@ class BitswapMessage { addEntry (cid, priority, cancel) { assert(cid && CID.isCID(cid), 'must be a valid cid') - const cidStr = cid.toBaseEncodedString() + const cidStr = cid.buffer.toString() const entry = this.wantlist.get(cidStr) @@ -40,13 +40,13 @@ class BitswapMessage { addBlock (cid, block) { assert(CID.isCID(cid), 'must be a valid cid') - const cidStr = cid.toBaseEncodedString() - this.blocks.set(cidStr, block) + const cidStr = cid.buffer.toString() + this.blocks.set(cidStr, {block: block, cid: cid}) } cancel (cid) { assert(CID.isCID(cid), 'must be a valid cid') - const cidStr = cid.toBaseEncodedString() + const cidStr = cid.buffer.toString() this.wantlist.delete(cidStr) this.addEntry(cid, 0, true) } @@ -67,7 +67,7 @@ class BitswapMessage { }) }, blocks: Array.from(this.blocks.values()) - .map((block) => block.data) + .map((block) => block.block.data) } if (this.full) { @@ -99,11 +99,10 @@ class BitswapMessage { msg.wantlist.full = true } - this.blocks.forEach((block, cidStr) => { - const cid = new CID(cidStr) + this.blocks.forEach((block) => { msg.payload.push({ - prefix: cid.prefix, - data: block.data + prefix: block.cid.prefix, + data: block.block.data }) }) diff --git a/test/components/decision-engine/index-test.js b/test/components/decision-engine/index-test.js index e09f43d8..f5e11f08 100644 --- a/test/components/decision-engine/index-test.js +++ b/test/components/decision-engine/index-test.js @@ -21,7 +21,7 @@ const mockNetwork = require('../../utils').mockNetwork function messageToString (m) { return Array.from(m[1].blocks.values()) - .map((b) => b.data.toString()) + .map((b) => b.block.data.toString()) } function stringifyMessages (messages) { diff --git a/test/index-test.js b/test/index-test.js index fd15cd83..fe1de8be 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -439,16 +439,13 @@ module.exports = (repo) => { } function hasBlocks (msg, store, cb) { - each(Array.from(msg.blocks.values()), (b, next) => { - b.key((err, key) => { + each(msg.blocks.values(), (b, next) => { + b.block.key((err, key) => { if (err) { return next(err) } - if (!b.cancel) { - store.has(key, next) - } else { - next() - } + + store.has(key, next) }) }, cb) } diff --git a/test/types/message.spec.js b/test/types/message.spec.js index 27578f9c..f80076f1 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -93,15 +93,16 @@ describe('BitswapMessage', () => { expect(msg.full).to.equal(true) expect(Array.from(msg.wantlist)) .to.eql([[ - cid0.toBaseEncodedString(), + cid0.buffer.toString(), new BitswapMessage.Entry(cid0, 0, false) ]]) - expect(Array.from(msg.blocks).map((b) => [b[0], b[1].data])) - .to.eql([ - [cid1.toBaseEncodedString(), b1.data], - [cid2.toBaseEncodedString(), b2.data] - ]) + expect( + Array.from(msg.blocks).map((b) => [b[0], b[1].block.data]) + ).to.eql([ + [cid1.buffer.toString(), b1.data], + [cid2.buffer.toString(), b2.data] + ]) done() }) @@ -137,15 +138,16 @@ describe('BitswapMessage', () => { expect(msg.full).to.equal(true) expect(Array.from(msg.wantlist)) .to.eql([[ - cid0.toBaseEncodedString(), + cid0.buffer.toString(), new BitswapMessage.Entry(cid0, 0, false) ]]) - expect(Array.from(msg.blocks).map((b) => [b[0], b[1].data])) - .to.eql([ - [cid1.toBaseEncodedString(), b1.data], - [cid2.toBaseEncodedString(), b2.data] - ]) + expect( + Array.from(msg.blocks).map((b) => [b[0], b[1].block.data]) + ).to.eql([ + [cid1.buffer.toString(), b1.data], + [cid2.buffer.toString(), b2.data] + ]) done() })