From 860165c496f6a00045f76c14b1cd00658bd65edb Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sat, 1 Apr 2017 19:23:53 +0200 Subject: [PATCH] feat: DHT integration PART I --- README.md | 3 +- package.json | 40 ++--- src/core/components/dht.js | 163 ++++++++++++++++++ src/core/components/id.js | 1 + src/core/components/index.js | 1 + src/core/components/libp2p.js | 24 +-- src/core/index.js | 5 + test/core/interface/dht.js | 22 +++ test/core/interface/interface.spec.js | 1 + test/core/kad-dht.node.js | 72 ++++++++ test/interop/{index.js => exchange-files.js} | 0 test/interop/interop-bitswap.js | 0 test/interop/interop-swarm.js | 0 test/interop/kad-dht.js | 109 ++++++++++++ test/interop/node.js | 3 +- .../ipfs-factory-instance/default-config.json | 28 --- test/utils/ipfs-factory-instance/index.js | 52 ++---- 17 files changed, 428 insertions(+), 96 deletions(-) create mode 100644 src/core/components/dht.js create mode 100644 test/core/interface/dht.js create mode 100644 test/core/kad-dht.node.js rename test/interop/{index.js => exchange-files.js} (100%) delete mode 100644 test/interop/interop-bitswap.js delete mode 100644 test/interop/interop-swarm.js create mode 100644 test/interop/kad-dht.js diff --git a/README.md b/README.md index 0642e41f91..b0c29e1871 100644 --- a/README.md +++ b/README.md @@ -232,7 +232,8 @@ const node = new IPFS({ EXPERIMENTAL: { // enable experimental features pubsub: true, sharding: true, // enable dir sharding - wrtcLinuxWindows: true // use unstable wrtc module on Linux or Windows with Node.js + wrtcLinuxWindows: true // use unstable wrtc module on Linux or Windows with Node.js, + dht: true // enable KadDHT, currently not interopable with go-ipfs }, config: { // overload the default config Addresses: { diff --git a/package.json b/package.json index 7b6566faa4..d231389202 100644 --- a/package.json +++ b/package.json @@ -31,9 +31,9 @@ "test:unit:node:http": "TEST=http npm run test:unit:node", "test:unit:node:cli": "TEST=cli npm run test:unit:node", "test:unit:browser": "gulp test:browser --dom", - "test:interop": "mocha -t 60000 test/interop", + "test:interop": "npm run test:interop:node", "test:interop:node": "mocha -t 60000 test/interop/node.js", - "test:interop:browser": "mocha test/interop/browser.js", + "test:interop:browser": "mocha -t 60000 test/interop/browser.js", "test:benchmark": "echo \"Error: no benchmarks yet\" && exit 1", "test:benchmark:node": "echo \"Error: no benchmarks yet\" && exit 1", "test:benchmark:node:core": "echo \"Error: no benchmarks yet\" && exit 1", @@ -63,23 +63,23 @@ }, "homepage": "https://github.com/ipfs/js-ipfs#readme", "devDependencies": { - "aegir": "^11.0.1", + "aegir": "^11.0.2", "buffer-loader": "0.0.1", "chai": "^3.5.0", "delay": "^2.0.0", "detect-node": "^2.0.3", - "dir-compare": "^1.3.0", + "dir-compare": "^1.4.0", "dirty-chai": "^1.2.2", - "eslint-plugin-react": "^6.10.3", + "eslint-plugin-react": "^7.0.1", "execa": "^0.6.3", "expose-loader": "^0.7.3", - "form-data": "^2.1.2", + "form-data": "^2.1.4", "gulp": "^3.9.1", - "interface-ipfs-core": "~0.27.0", + "interface-ipfs-core": "~0.27.2", "ipfsd-ctl": "~0.20.0", "left-pad": "^1.1.3", "lodash": "^4.17.4", - "mocha": "^3.2.0", + "mocha": "^3.4.1", "ncp": "^2.0.0", "nexpect": "^0.5.0", "pre-commit": "^1.2.2", @@ -91,17 +91,17 @@ "transform-loader": "^0.2.4" }, "dependencies": { - "async": "^2.3.0", - "bl": "^1.2.0", + "async": "^2.4.0", + "bl": "^1.2.1", "boom": "^4.3.1", "cids": "^0.5.0", - "debug": "^2.6.3", + "debug": "^2.6.8", "fsm-event": "^2.1.0", "glob": "^7.1.1", "hapi": "^16.1.1", "hapi-set-header": "^1.0.2", "hoek": "^4.1.1", - "ipfs-api": "^14.0.0", + "ipfs-api": "^14.0.1", "ipfs-bitswap": "~0.13.0", "ipfs-block": "~0.6.0", "ipfs-block-service": "~0.9.0", @@ -111,10 +111,10 @@ "ipfs-unixfs-engine": "~0.19.1", "ipld-resolver": "~0.11.0", "isstream": "^0.1.2", - "joi": "^10.4.1", + "joi": "^10.5.0", "libp2p-floodsub": "~0.9.4", - "libp2p-ipfs-browser": "~0.23.0", - "libp2p-ipfs-nodejs": "~0.23.0", + "libp2p-ipfs-browser": "~0.24.1", + "libp2p-ipfs-nodejs": "~0.25.2", "lodash.flatmap": "^4.5.0", "lodash.get": "^4.4.2", "lodash.has": "^4.5.2", @@ -132,21 +132,21 @@ "peer-info": "~0.9.2", "promisify-es6": "^1.0.2", "pull-file": "^1.0.0", - "pull-paramap": "^1.2.1", - "pull-pushable": "^2.0.1", + "pull-paramap": "^1.2.2", + "pull-pushable": "^2.1.1", "pull-sort": "^1.0.0", - "pull-stream": "^3.5.0", + "pull-stream": "^3.6.0", "pull-stream-to-stream": "^1.3.4", "pull-zip": "^2.0.1", "read-pkg-up": "^2.0.0", "readable-stream": "1.1.14", "safe-buffer": "^5.0.1", "stream-to-pull-stream": "^1.7.2", - "tar-stream": "^1.5.2", + "tar-stream": "^1.5.4", "temp": "^0.8.3", "through2": "^2.0.3", "update-notifier": "^2.1.0", - "yargs": "7.0.2" + "yargs": "8.0.1" }, "contributors": [ "Andrew de Andrade ", diff --git a/src/core/components/dht.js b/src/core/components/dht.js new file mode 100644 index 0000000000..0a2ce5d8e2 --- /dev/null +++ b/src/core/components/dht.js @@ -0,0 +1,163 @@ +'use strict' + +const promisify = require('promisify-es6') +const every = require('async/every') +const PeerId = require('peer-id') +const CID = require('cids') +const each = require('async/each') +// const bsplit = require('buffer-split') + +module.exports = (self) => { + return { + /** + * Given a key, query the DHT for its best value. + * + * @param {Buffer} key + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + get: promisify((key, options, callback) => { + if (!Buffer.isBuffer(key)) { + return callback(new Error('Not valid key')) + } + + if (typeof options === 'function') { + callback = options + options = {} + } + + self._libp2pNode.dht.get(key, options.timeout, callback) + }), + + /** + * Write a key/value pair to the DHT. + * + * Given a key of the form /foo/bar and a value of any + * form, this will write that value to the DHT with + * that key. + * + * @param {Buffer} key + * @param {Buffer} value + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + put: promisify((key, value, callback) => { + if (!Buffer.isBuffer(key)) { + return callback(new Error('Not valid key')) + } + + self._libp2pNode.dht.put(key, value, callback) + }), + + /** + * Find peers in the DHT that can provide a specific value, given a key. + * + * @param {CID} key - They key to find providers for. + * @param {function(Error, Array)} [callback] + * @returns {Promise|void} + */ + findprovs: promisify((key, callback) => { + if (typeof key === 'string') { + key = new CID(key) + } + + self._libp2pNode.contentRouting.findProviders(key, callback) + }), + + /** + * Query the DHT for all multiaddresses associated with a `PeerId`. + * + * @param {PeerId} peer - The id of the peer to search for. + * @param {function(Error, Array)} [callback] + * @returns {Promise>|void} + */ + findpeer: promisify((peer, callback) => { + if (typeof peer === 'string') { + peer = PeerId.createFromB58String(peer) + } + + self._libp2pNode.peerRouting.findPeer(peer, (err, info) => { + if (err) { + return callback(err) + } + + // convert to go-ipfs return value, we need to revisit + // this. For now will just conform. + const goResult = [ + { + Responses: [{ + ID: info.id.toB58String(), + Addresses: info.multiaddrs.toArray().map((a) => a.toString()) + }] + } + ] + + callback(null, goResult) + }) + }), + + /** + * Announce to the network that we are providing given values. + * + * @param {CID|Array} keys - The keys that should be announced. + * @param {Object} [options={}] + * @param {bool} [options.recursive=false] - Provide not only the given object but also all objects linked from it. + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + provide: promisify((keys, options, callback) => { + if (!Array.isArray(keys)) { + keys = [keys] + } + if (typeof options === 'function') { + callback = options + options = {} + } + + // ensure blocks are actually local + every(keys, (key, cb) => { + self._repo.blockstore.has(key, cb) + }, (err, has) => { + if (err) { + return callback(err) + } + /* TODO reconsider this. go-ipfs provides anyway + if (!has) { + return callback(new Error('Not all blocks exist locally, can not provide')) + } + */ + + if (options.recursive) { + // TODO: Implement recursive providing + } else { + each(keys, (cid, cb) => { + self._libp2pNode.contentRouting.provide(cid, cb) + }, callback) + } + }) + }), + + /** + * Find the closest peers to a given `PeerId`, by querying the DHT. + * + * @param {PeerId} peer - The `PeerId` to run the query agains. + * @param {function(Error, Array)} [callback] + * @returns {Promise>|void} + */ + query: promisify((peerId, callback) => { + if (typeof peerId === 'string') { + peerId = PeerId.createFromB58String(peerId) + } + + // TODO expose this method in peerRouting + self._libp2pNode._dht.getClosestPeers(peerId.toBytes(), (err, peerIds) => { + if (err) { + return callback(err) + } + callback(null, peerIds.map((id) => { + return { ID: id.toB58String() } + })) + }) + }) + } +} diff --git a/src/core/components/id.js b/src/core/components/id.js index a9274eefbd..20ec901564 100644 --- a/src/core/components/id.js +++ b/src/core/components/id.js @@ -15,6 +15,7 @@ module.exports = function id (self) { addresses: self._peerInfo.multiaddrs .toArray() .map((ma) => ma.toString()) + .filter((ma) => ma.indexOf('ipfs') >= 0) .sort(), agentVersion: 'js-ipfs', protocolVersion: '9000' diff --git a/src/core/components/index.js b/src/core/components/index.js index 8eca1acb30..e4fb45464f 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -19,3 +19,4 @@ exports.ping = require('./ping') exports.files = require('./files') exports.bitswap = require('./bitswap') exports.pubsub = require('./pubsub') +exports.dht = require('./dht') diff --git a/src/core/components/libp2p.js b/src/core/components/libp2p.js index 385c9ac69f..c0dbb79b59 100644 --- a/src/core/components/libp2p.js +++ b/src/core/components/libp2p.js @@ -17,11 +17,23 @@ module.exports = function libp2p (self) { const options = { mdns: get(config, 'Discovery.MDNS.Enabled'), webRTCStar: get(config, 'Discovery.webRTCStar.Enabled'), - bootstrap: get(config, 'Bootstrap') + bootstrap: get(config, 'Bootstrap'), + dht: self._options.EXPERIMENTAL.dht } self._libp2pNode = new Node(self._peerInfo, self._peerInfoBook, options) + self._libp2pNode.on('peer:discovery', (peerInfo) => { + if (self.isOnline()) { + self._peerInfoBook.put(peerInfo) + self._libp2pNode.dial(peerInfo, () => {}) + } + }) + + self._libp2pNode.on('peer:connect', (peerInfo) => { + self._peerInfoBook.put(peerInfo) + }) + self._libp2pNode.start((err) => { if (err) { return callback(err) @@ -31,16 +43,6 @@ module.exports = function libp2p (self) { console.log('Swarm listening on', ma.toString()) }) - self._libp2pNode.on('peer:discovery', (peerInfo) => { - if (self.isOnline()) { - self._peerInfoBook.put(peerInfo) - self._libp2pNode.dial(peerInfo, () => {}) - } - }) - self._libp2pNode.on('peer:connect', (peerInfo) => { - self._peerInfoBook.put(peerInfo) - }) - callback() }) } diff --git a/src/core/index.js b/src/core/index.js index f0b7c0c815..3c8c0791a0 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -93,6 +93,7 @@ class IPFS extends EventEmitter { this.bitswap = components.bitswap(this) this.ping = components.ping(this) this.pubsub = components.pubsub(this) + this.dht = components.dht(this) if (this._options.EXPERIMENTAL.pubsub) { this.log('EXPERIMENTAL pubsub is enabled') @@ -100,6 +101,10 @@ class IPFS extends EventEmitter { if (this._options.EXPERIMENTAL.sharding) { this.log('EXPERIMENTAL sharding is enabled') } + if (this._options.EXPERIMENTAL.dht) { + this.log('EXPERIMENTAL Kademlia DHT is enabled') + } + this.state = require('./state')(this) boot(this) diff --git a/test/core/interface/dht.js b/test/core/interface/dht.js new file mode 100644 index 0000000000..28e6bb7658 --- /dev/null +++ b/test/core/interface/dht.js @@ -0,0 +1,22 @@ +/* eslint-env mocha */ + +'use strict' + +/* +const test = require('interface-ipfs-core') +const IPFSFactory = require('../../utils/ipfs-factory-instance') + +let factory + +const common = { + setup: function (callback) { + factory = new IPFSFactory() + callback(null, factory) + }, + teardown: function (callback) { + factory.dismantle(callback) + } +} + +test.dht(common) +*/ diff --git a/test/core/interface/interface.spec.js b/test/core/interface/interface.spec.js index bf749464d1..23e7dab09f 100644 --- a/test/core/interface/interface.spec.js +++ b/test/core/interface/interface.spec.js @@ -14,5 +14,6 @@ describe('interface-ipfs-core tests', () => { if (isNode) { require('./swarm') require('./pubsub') + require('./dht') } }) diff --git a/test/core/kad-dht.node.js b/test/core/kad-dht.node.js new file mode 100644 index 0000000000..4f17733b38 --- /dev/null +++ b/test/core/kad-dht.node.js @@ -0,0 +1,72 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const bl = require('bl') +const parallel = require('async/parallel') +const Buffer = require('safe-buffer') +const IPFSFactory = require('../utils/ipfs-factory-instance') + +describe('verify that kad-dht is doing its thing', () => { + let factory + let nodeA + let nodeB + let nodeC + // let addrA + let addrB + let addrC + + before((done) => { + factory = new IPFSFactory() + parallel([ + (cb) => factory.spawnNode(cb), + (cb) => factory.spawnNode(cb), + (cb) => factory.spawnNode(cb) + ], (err, nodes) => { + expect(err).to.not.exist() + nodeA = nodes[0] + nodeB = nodes[1] + nodeC = nodes[2] + parallel([ + (cb) => nodeA.id(cb), + (cb) => nodeB.id(cb), + (cb) => nodeC.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + // addrA = ids[0].addresses[0] + addrB = ids[1].addresses[0] + addrC = ids[2].addresses[0] + parallel([ + (cb) => nodeA.swarm.connect(addrB, cb), + (cb) => nodeB.swarm.connect(addrC, cb) + ], done) + }) + }) + }) + + after((done) => factory.dismantle(done)) + + it('add a file in C, fetch through B in A', (done) => { + const file = { + path: 'testfile.txt', + content: Buffer.from('hello kad') + } + + nodeC.files.add(file, (err, res) => { + expect(err).to.not.exist() + nodeA.files.cat(res[0].hash, (err, stream) => { + expect(err).to.not.exist() + stream.pipe(bl((err, data) => { + expect(err).to.not.exist() + console.log(data.toString()) + expect(data).to.eql(new Buffer('hello kad')) + done() + })) + }) + }) + }) +}) diff --git a/test/interop/index.js b/test/interop/exchange-files.js similarity index 100% rename from test/interop/index.js rename to test/interop/exchange-files.js diff --git a/test/interop/interop-bitswap.js b/test/interop/interop-bitswap.js deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/interop/interop-swarm.js b/test/interop/interop-swarm.js deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/interop/kad-dht.js b/test/interop/kad-dht.js new file mode 100644 index 0000000000..a9564fb269 --- /dev/null +++ b/test/interop/kad-dht.js @@ -0,0 +1,109 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const series = require('async/series') +const crypto = require('crypto') +const parallel = require('async/parallel') +const waterfall = require('async/waterfall') +const bl = require('bl') + +const GODaemon = require('./daemons/go') +const JSDaemon = require('./daemons/js') + +describe.skip('kad-dht', () => { + describe('a JS node in the land of Go', () => { + let jsD + let goD1 + let goD2 + let goD3 + + before((done) => { + goD1 = new GODaemon() + goD2 = new GODaemon() + goD3 = new GODaemon() + + jsD = new JSDaemon({ port: 40 }) + + parallel([ + (cb) => goD1.start(cb), + (cb) => goD2.start(cb), + (cb) => goD3.start(cb), + (cb) => jsD.start(cb) + ], done) + }) + + after((done) => { + series([ + (cb) => goD1.stop(cb), + (cb) => goD2.stop(cb), + (cb) => goD3.stop(cb), + (cb) => jsD.stop(cb) + ], done) + }) + + it('make connections', (done) => { + parallel([ + (cb) => jsD.api.id(cb), + (cb) => goD1.api.id(cb), + (cb) => goD2.api.id(cb), + (cb) => goD3.api.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + parallel([ + (cb) => jsD.api.swarm.connect(ids[1].addresses[0], cb), + (cb) => goD1.api.swarm.connect(ids[2].addresses[0], cb), + (cb) => goD2.api.swarm.connect(ids[3].addresses[0], cb) + ], done) + }) + }) + + it('one hop', (done) => { + const data = crypto.randomBytes(9001) + + waterfall([ + (cb) => goD1.api.add(data, cb), + (res, cb) => jsD.api.cat(res[0].hash, cb), + (stream, cb) => stream.pipe(bl(cb)) + ], (err, file) => { + expect(err).to.not.exist() + expect(file).to.be.eql(data) + done() + }) + }) + + it('two hops', (done) => { + const data = crypto.randomBytes(9001) + + waterfall([ + (cb) => goD2.api.add(data, cb), + (res, cb) => jsD.api.cat(res[0].hash, cb), + (stream, cb) => stream.pipe(bl(cb)) + ], (err, file) => { + expect(err).to.not.exist() + expect(file).to.be.eql(data) + done() + }) + }) + + it('three hops', (done) => { + const data = crypto.randomBytes(9001) + + waterfall([ + (cb) => goD3.api.add(data, cb), + (res, cb) => jsD.api.cat(res[0].hash, cb), + (stream, cb) => stream.pipe(bl(cb)) + ], (err, file) => { + expect(err).to.not.exist() + expect(file).to.be.eql(data) + done() + }) + }) + }) + + describe('a Go node in the land of JS', () => {}) + describe('hybrid', () => {}) +}) diff --git a/test/interop/node.js b/test/interop/node.js index eb02fccaa1..4ebf41db24 100644 --- a/test/interop/node.js +++ b/test/interop/node.js @@ -2,6 +2,7 @@ 'use strict' describe('interop', () => { - require('./index') + require('./exchange-files') + require('./kad-dht') require('./repo') }) diff --git a/test/utils/ipfs-factory-instance/default-config.json b/test/utils/ipfs-factory-instance/default-config.json index 47eac6522a..8428849538 100644 --- a/test/utils/ipfs-factory-instance/default-config.json +++ b/test/utils/ipfs-factory-instance/default-config.json @@ -1,8 +1,4 @@ { - "Identity": { - "PeerID": "", - "PrivKey": "" - }, "Datastore": { "Type": "", "Path": "", @@ -19,11 +15,6 @@ "API": "/ip4/127.0.0.1/tcp/0", "Gateway": "/ip4/127.0.0.1/tcp/0" }, - "Mounts": { - "IPFS": "/ipfs", - "IPNS": "/ipns", - "FuseAllowOther": false - }, "Version": { "Current": "jsipfs-dev", "Check": "error", @@ -46,26 +37,7 @@ "ResolveCacheSize": 128 }, "Bootstrap": [], - "Tour": { - "Last": "" - }, - "Gateway": { - "HTTPHeaders": null, - "RootRedirect": "", - "Writable": false - }, - "SupernodeRouting": { - "Servers": [] - }, "API": { "HTTPHeaders": null - }, - "Swarm": { - "AddrFilters": null - }, - "Log": { - "MaxSizeMB": 250, - "MaxBackups": 1, - "MaxAgeDays": 0 } } diff --git a/test/utils/ipfs-factory-instance/index.js b/test/utils/ipfs-factory-instance/index.js index ec1f3d410d..2abdcbf115 100644 --- a/test/utils/ipfs-factory-instance/index.js +++ b/test/utils/ipfs-factory-instance/index.js @@ -1,6 +1,5 @@ 'use strict' -const PeerId = require('peer-id') const series = require('async/series') const each = require('async/each') @@ -34,45 +33,28 @@ function Factory () { .substring(2, 8) } - createConfig(config, (err, config) => { - if (err) { - return callback(err) + config = config || defaultConfig + + const repo = createTempRepo(repoPath) + const node = new IPFS({ + repo: repo, + init: { + bits: 1024 + }, + config: config, + EXPERIMENTAL: { + pubsub: true, + dht: true } + }) - // create the IPFS node - const repo = createTempRepo(repoPath) - const node = new IPFS({ + node.once('ready', () => { + nodes.push({ repo: repo, - config: config, - EXPERIMENTAL: { - pubsub: true - } - }) - - node.once('start', () => { - nodes.push({ repo: repo, ipfs: node }) - callback(null, node) + ipfs: node }) + callback(null, node) }) - - function createConfig (config, cb) { - if (config) { - return cb(null, config) - } - - config = JSON.parse(JSON.stringify(defaultConfig)) - - PeerId.create({ bits: 1024 }, (err, id) => { - if (err) { - return cb(err) - } - - const pId = id.toJSON() - config.Identity.PeerID = pId.id - config.Identity.PrivKey = pId.privKey - cb(null, config) - }) - } } this.dismantle = function (callback) {