diff --git a/README.md b/README.md index 9ffe6e8c82..20c53757a6 100644 --- a/README.md +++ b/README.md @@ -303,6 +303,7 @@ Configure remote preload nodes. The remote will preload content added on this no Enable and configure experimental features. - `pubsub` (boolean): Enable libp2p pub-sub. (Default: `false`) +- `ipnsPubsub` (boolean): Enable pub-sub on IPNS. (Default: `false`) - `sharding` (boolean): Enable directory sharding. Directories that have many child objects will be represented by multiple DAG nodes instead of just one. It can improve lookup performance when a directory has several thousand files or more. (Default: `false`) - `dht` (boolean): Enable KadDHT. **This is currently not interoperable with `go-ipfs`.** @@ -561,6 +562,9 @@ The core API is grouped into several areas: - [name](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md) - [`ipfs.name.publish(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepublish) + - [`ipfs.name.pubsub.cancel(arg, [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubcancel) + - [`ipfs.name.pubsub.state([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubstate) + - [`ipfs.name.pubsub.subs([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubsubs) - [`ipfs.name.resolve(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#nameresolve) #### Crypto and Key Management diff --git a/package.json b/package.json index a1a3b40fd4..f7c4f5ef9d 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,7 @@ "execa": "^1.0.0", "form-data": "^2.3.3", "hat": "0.0.3", - "interface-ipfs-core": "~0.88.0", + "interface-ipfs-core": "~0.89.0", "ipfsd-ctl": "~0.40.1", "ncp": "^2.0.0", "qs": "^6.5.2", @@ -88,7 +88,9 @@ "byteman": "^1.3.5", "cid-tool": "~0.2.0", "cids": "~0.5.5", + "class-is": "^1.1.0", "datastore-core": "~0.6.0", + "datastore-pubsub": "~0.1.1", "debug": "^4.1.0", "deep-extend": "~0.6.0", "err-code": "^1.1.2", @@ -118,7 +120,7 @@ "ipld-ethereum": "^2.0.1", "ipld-git": "~0.2.2", "ipld-zcash": "~0.1.6", - "ipns": "~0.3.0", + "ipns": "~0.4.3", "is-ipfs": "~0.4.7", "is-pull-stream": "~0.0.0", "is-stream": "^1.1.0", diff --git a/src/cli/commands/daemon.js b/src/cli/commands/daemon.js index cca6845bf5..e71d3a5d38 100644 --- a/src/cli/commands/daemon.js +++ b/src/cli/commands/daemon.js @@ -27,6 +27,9 @@ module.exports = { }) .option('local', { desc: 'Run commands locally to the daemon', + default: false + }) + .option('enable-namesys-pubsub', { type: 'boolean', default: false }) diff --git a/src/cli/commands/name/pubsub.js b/src/cli/commands/name/pubsub.js new file mode 100644 index 0000000000..218afb6c4b --- /dev/null +++ b/src/cli/commands/name/pubsub.js @@ -0,0 +1,18 @@ +'use strict' + +/* +Manage and inspect the state of the IPNS pubsub resolver. +Note: this command is experimental and subject to change as the system is refined. +*/ +module.exports = { + command: 'pubsub', + + description: 'IPNS pubsub management.', + + builder (yargs) { + return yargs.commandDir('pubsub') + }, + + handler (argv) { + } +} diff --git a/src/cli/commands/name/pubsub/cancel.js b/src/cli/commands/name/pubsub/cancel.js new file mode 100644 index 0000000000..5f0d709294 --- /dev/null +++ b/src/cli/commands/name/pubsub/cancel.js @@ -0,0 +1,19 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'cancel ', + + describe: 'Cancel a name subscription.', + + handler (argv) { + argv.ipfs.name.pubsub.cancel(argv.name, (err, result) => { + if (err) { + throw err + } else { + print(result.canceled ? 'canceled' : 'no subscription') + } + }) + } +} diff --git a/src/cli/commands/name/pubsub/state.js b/src/cli/commands/name/pubsub/state.js new file mode 100644 index 0000000000..08c65fbed1 --- /dev/null +++ b/src/cli/commands/name/pubsub/state.js @@ -0,0 +1,19 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'state', + + describe: 'Query the state of IPNS pubsub.', + + handler (argv) { + argv.ipfs.name.pubsub.state((err, result) => { + if (err) { + throw err + } else { + print(result.enabled ? 'enabled' : 'disabled') + } + }) + } +} diff --git a/src/cli/commands/name/pubsub/subs.js b/src/cli/commands/name/pubsub/subs.js new file mode 100644 index 0000000000..ced5682626 --- /dev/null +++ b/src/cli/commands/name/pubsub/subs.js @@ -0,0 +1,21 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'subs', + + describe: 'Show current name subscriptions.', + + handler (argv) { + argv.ipfs.name.pubsub.subs((err, result) => { + if (err) { + throw err + } else { + result.forEach((s) => { + print(s) + }) + } + }) + } +} diff --git a/src/core/components/name-pubsub.js b/src/core/components/name-pubsub.js new file mode 100644 index 0000000000..6e92a23387 --- /dev/null +++ b/src/core/components/name-pubsub.js @@ -0,0 +1,92 @@ +'use strict' + +const debug = require('debug') +const errcode = require('err-code') +const promisify = require('promisify-es6') + +const IpnsPubsubDatastore = require('../ipns/routing/pubsub-datastore') + +const log = debug('jsipfs:name-pubsub') +log.error = debug('jsipfs:name-pubsub:error') + +// Is pubsub enabled +const isNamePubsubEnabled = (node) => { + try { + return Boolean(getPubsubRouting(node)) + } catch (err) { + return false + } +} + +// Get pubsub from IPNS routing +const getPubsubRouting = (node) => { + if (!node._ipns || !node._options.EXPERIMENTAL.ipnsPubsub) { + const errMsg = 'IPNS pubsub subsystem is not enabled' + + throw errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED') + } + + // Only one store and it is pubsub + if (IpnsPubsubDatastore.isIpnsPubsubDatastore(node._ipns.routing)) { + return node._ipns.routing + } + + // Find in tiered + const pubsub = (node._ipns.routing.stores || []).find(s => IpnsPubsubDatastore.isIpnsPubsubDatastore(s)) + + if (!pubsub) { + const errMsg = 'IPNS pubsub datastore not found' + + throw errcode(errMsg, 'ERR_PUBSUB_DATASTORE_NOT_FOUND') + } + + return pubsub +} + +module.exports = function namePubsub (self) { + return { + /** + * Query the state of IPNS pubsub. + * + * @returns {Promise|void} + */ + state: promisify((callback) => { + callback(null, { + enabled: isNamePubsubEnabled(self) + }) + }), + /** + * Cancel a name subscription. + * + * @param {String} name subscription name. + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + cancel: promisify((name, callback) => { + let pubsub + try { + pubsub = getPubsubRouting(self) + } catch (err) { + return callback(err) + } + + pubsub.cancel(name, callback) + }), + /** + * Show current name subscriptions. + * + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + subs: promisify((callback) => { + let pubsub + try { + pubsub = getPubsubRouting(self) + } catch (err) { + return callback(err) + } + + pubsub.getSubscriptions(callback) + }) + } +} diff --git a/src/core/components/name.js b/src/core/components/name.js index 8acb5c89c2..6c734a35b2 100644 --- a/src/core/components/name.js +++ b/src/core/components/name.js @@ -11,6 +11,7 @@ const errcode = require('err-code') const log = debug('jsipfs:name') log.error = debug('jsipfs:name:error') +const namePubsub = require('./name-pubsub') const utils = require('../utils') const path = require('../ipns/path') @@ -161,6 +162,7 @@ module.exports = function name (self) { } self._ipns.resolve(name, resolveOptions, callback) - }) + }), + pubsub: namePubsub(self) } } diff --git a/src/core/components/start.js b/src/core/components/start.js index 518d613b0e..8a8fe10a85 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,11 +2,13 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') +const get = require('lodash/get') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const { TieredDatastore } = require('datastore-core') const IPNS = require('../ipns') +const PubsubDatastore = require('../ipns/routing/pubsub-datastore') const OfflineDatastore = require('../ipns/routing/offline-datastore') module.exports = (self) => { @@ -41,7 +43,16 @@ module.exports = (self) => { // Setup online routing for IPNS with a tiered routing composed by a DHT and a Pubsub router (if properly enabled) const ipnsStores = [] - // TODO Add IPNS pubsub if enabled + // Add IPNS pubsub if enabled + let pubsubDs + if (get(self._options, 'EXPERIMENTAL.ipnsPubsub', false)) { + const pubsub = self._libp2pNode.pubsub + const localDatastore = self._repo.datastore + const peerId = self._peerInfo.id + + pubsubDs = new PubsubDatastore(pubsub, localDatastore, peerId) + ipnsStores.push(pubsubDs) + } // NOTE: IPNS routing is being replaced by the local repo datastore while the IPNS over DHT is not ready // When DHT is added, if local option enabled, should receive offlineDatastore as well diff --git a/src/core/config.js b/src/core/config.js index a39432d53e..acad9d877f 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -29,6 +29,7 @@ const schema = Joi.object().keys({ }).allow(null), EXPERIMENTAL: Joi.object().keys({ pubsub: Joi.boolean(), + ipnsPubsub: Joi.boolean(), sharding: Joi.boolean(), dht: Joi.boolean() }).allow(null), diff --git a/src/core/index.js b/src/core/index.js index 566cec8404..8425cab7a2 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -165,6 +165,14 @@ class IPFS extends EventEmitter { if (this._options.EXPERIMENTAL.pubsub) { this.log('EXPERIMENTAL pubsub is enabled') } + if (this._options.EXPERIMENTAL.ipnsPubsub) { + if (!this._options.EXPERIMENTAL.pubsub) { + this.log('EXPERIMENTAL pubsub is enabled to use IPNS pubsub') + this._options.EXPERIMENTAL.pubsub = true + } + + this.log('EXPERIMENTAL IPNS pubsub is enabled') + } if (this._options.EXPERIMENTAL.sharding) { this.log('EXPERIMENTAL sharding is enabled') } diff --git a/src/core/ipns/publisher.js b/src/core/ipns/publisher.js index 9e8fcdf3f3..4be4fc41bb 100644 --- a/src/core/ipns/publisher.js +++ b/src/core/ipns/publisher.js @@ -1,7 +1,6 @@ 'use strict' const PeerId = require('peer-id') -const Record = require('libp2p-record').Record const { Key } = require('interface-datastore') const series = require('async/series') const errcode = require('err-code') @@ -97,19 +96,17 @@ class IpnsPublisher { return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')) } - let rec + let entryData try { // Marshal record - const entryData = ipns.marshal(entry) - // Marshal to libp2p record - rec = new Record(key.toBuffer(), entryData) + entryData = ipns.marshal(entry) } catch (err) { log.error(err) return callback(err) } // Add record to routing (buffer key) - this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => { + this._routing.put(key.toBuffer(), entryData, (err, res) => { if (err) { const errMsg = `ipns record for ${key.toString()} could not be stored in the routing` @@ -137,17 +134,8 @@ class IpnsPublisher { return callback(errcode(new Error(errMsg), 'ERR_UNDEFINED_PARAMETER')) } - let rec - try { - // Marshal to libp2p record - rec = new Record(key.toBuffer(), publicKey.bytes) - } catch (err) { - log.error(err) - return callback(err) - } - // Add public key to routing (buffer key) - this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => { + this._routing.put(key.toBuffer(), publicKey.bytes, (err, res) => { if (err) { const errMsg = `public key for ${key.toString()} could not be stored in the routing` diff --git a/src/core/ipns/resolver.js b/src/core/ipns/resolver.js index f9fb81ccd5..8ff9c38ab3 100644 --- a/src/core/ipns/resolver.js +++ b/src/core/ipns/resolver.js @@ -1,7 +1,6 @@ 'use strict' const ipns = require('ipns') -const Record = require('libp2p-record').Record const PeerId = require('peer-id') const errcode = require('err-code') @@ -119,8 +118,7 @@ class IpnsResolver { let ipnsEntry try { - const record = Record.deserialize(res) - ipnsEntry = ipns.unmarshal(record.value) + ipnsEntry = ipns.unmarshal(res) } catch (err) { const errMsg = `found ipns record that we couldn't convert to a value` diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js index 26de52528c..d561a5665f 100644 --- a/src/core/ipns/routing/offline-datastore.js +++ b/src/core/ipns/routing/offline-datastore.js @@ -1,6 +1,7 @@ 'use strict' const { Key } = require('interface-datastore') +const { Record } = require('libp2p-record') const { encodeBase32 } = require('./utils') const errcode = require('err-code') @@ -48,7 +49,10 @@ class OfflineDatastore { return callback(errcode(new Error(errMsg), 'ERR_GENERATING_ROUTING_KEY')) } - this._repo.datastore.put(routingKey, value, callback) + // Marshal to libp2p record as the DHT does + const record = new Record(key, value) + + this._repo.datastore.put(routingKey, record.serialize(), callback) } /** @@ -76,7 +80,22 @@ class OfflineDatastore { return callback(errcode(new Error(errMsg), 'ERR_GENERATING_ROUTING_KEY')) } - this._repo.datastore.get(routingKey, callback) + this._repo.datastore.get(routingKey, (err, res) => { + if (err) { + return callback(err) + } + + // Unmarshal libp2p record as the DHT does + let record + try { + record = Record.deserialize(res) + } catch (err) { + log.error(err) + return callback(err) + } + + callback(null, record.value) + }) } // encode key properly - base32(/ipns/{cid}) diff --git a/src/core/ipns/routing/pubsub-datastore.js b/src/core/ipns/routing/pubsub-datastore.js new file mode 100644 index 0000000000..10cfad8647 --- /dev/null +++ b/src/core/ipns/routing/pubsub-datastore.js @@ -0,0 +1,146 @@ +'use strict' + +const ipns = require('ipns') +const { fromB58String, toB58String } = require('multihashes') +const PubsubDatastore = require('datastore-pubsub') + +const withIs = require('class-is') + +const errcode = require('err-code') +const debug = require('debug') +const log = debug('jsipfs:ipns:pubsub') +log.error = debug('jsipfs:ipns:pubsub:error') + +// Pubsub datastore aims to manage the pubsub subscriptions for IPNS +class IpnsPubsubDatastore { + constructor (pubsub, localDatastore, peerId) { + this._pubsub = pubsub + this._subscriptions = {} + + // Bind _handleSubscriptionKey function, which is called by PubsubDatastore. + this._handleSubscriptionKey = this._handleSubscriptionKey.bind(this) + this._pubsubDs = new PubsubDatastore(pubsub, localDatastore, peerId, ipns.validator, this._handleSubscriptionKey) + } + + /** + * Put a value to the pubsub datastore indexed by the received key properly encoded. + * @param {Buffer} key identifier of the value. + * @param {Buffer} value value to be stored. + * @param {function(Error)} callback + * @returns {void} + */ + put (key, value, callback) { + this._pubsubDs.put(key, value, callback) + } + + /** + * Get a value from the pubsub datastore indexed by the received key properly encoded. + * Moreover, the identifier topic is subscribed and the pubsub datastore records will be + * updated once new publishes occur. + * @param {Buffer} key identifier of the value to be obtained. + * @param {function(Error, Buffer)} callback + * @returns {void} + */ + get (key, callback) { + this._pubsubDs.get(key, (err, res) => { + // Add topic subscribed + const ns = key.slice(0, ipns.namespaceLength) + + if (ns.toString() === ipns.namespace) { + const stringifiedTopic = key.toString() + const id = toB58String(key.slice(ipns.namespaceLength)) + + this._subscriptions[stringifiedTopic] = id + + log(`subscribed pubsub ${stringifiedTopic}: ${id}`) + } + + // If no data was obtained, after storing the subscription, return the error. + if (err) { + return callback(err) + } + + callback(null, res) + }) + } + + // Modify subscription key to have a proper encoding + _handleSubscriptionKey (key, callback) { + const subscriber = this._subscriptions[key] + + if (!subscriber) { + const errMsg = `key ${key} does not correspond to a subscription` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + let keys + try { + keys = ipns.getIdKeys(fromB58String(subscriber)) + } catch (err) { + log.error(err) + return callback(err) + } + + callback(null, keys.routingKey.toBuffer()) + } + + /** + * Get pubsub subscriptions related to ipns. + * @param {function(Error, Object)} callback + * @returns {void} + */ + getSubscriptions (callback) { + const subscriptions = Object.values(this._subscriptions).filter(Boolean) + + return callback(null, subscriptions.map((sub) => `${ipns.namespace}${sub}`)) + } + + /** + * Cancel pubsub subscriptions related to ipns. + * @param {String} name ipns path to cancel the pubsub subscription. + * @param {function(Error, Object)} callback + * @returns {void} + */ + cancel (name, callback) { + if (typeof name !== 'string') { + const errMsg = `received subscription name is not valid` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_SUBSCRIPTION_NAME')) + } + + // Trim /ipns/ prefix from the name + if (name.startsWith(ipns.namespace)) { + name = name.substring(ipns.namespaceLength) + } + + const stringifiedTopic = Object.keys(this._subscriptions).find((key) => this._subscriptions[key] === name) + + // Not found topic + if (!stringifiedTopic) { + return callback(null, { + canceled: false + }) + } + + // Unsubscribe topic + try { + const bufTopic = Buffer.from(stringifiedTopic) + + this._pubsubDs.unsubscribe(bufTopic) + } catch (err) { + return callback(err) + } + + this._subscriptions[stringifiedTopic] = undefined + log(`unsubscribed pubsub ${stringifiedTopic}: ${name}`) + + callback(null, { + canceled: true + }) + } +} + +exports = module.exports = withIs(IpnsPubsubDatastore, { className: 'IpnsPubsubDatastore', symbolName: '@js-ipfs/ipns/IpnsPubsubDatastore' }) diff --git a/src/http/api/resources/name.js b/src/http/api/resources/name.js index 23fc8bbab1..be2c9eaad5 100644 --- a/src/http/api/resources/name.js +++ b/src/http/api/resources/name.js @@ -59,3 +59,66 @@ exports.publish = { }) } } + +exports.pubsub = { + state: { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.name.pubsub.state((err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Enabled: res.enabled + }).code(200) + }) + } + }, + subs: { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.name.pubsub.subs((err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Strings: res + }).code(200) + }) + } + }, + cancel: { + validate: { + query: Joi.object().keys({ + arg: Joi.string().required() + }).unknown() + }, + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + const { arg } = request.query + + ipfs.name.pubsub.cancel(arg, (err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Canceled: res.canceled + }).code(200) + }) + } + } +} diff --git a/src/http/api/routes/name.js b/src/http/api/routes/name.js index 29647f3a6a..f49d0bd6ca 100644 --- a/src/http/api/routes/name.js +++ b/src/http/api/routes/name.js @@ -22,4 +22,29 @@ module.exports = (server) => { validate: resources.name.publish.validate } }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/state', + config: { + handler: resources.name.pubsub.state.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/subs', + config: { + handler: resources.name.pubsub.subs.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/cancel', + config: { + handler: resources.name.pubsub.cancel.handler, + validate: resources.name.pubsub.cancel.validate + } + }) } diff --git a/src/http/index.js b/src/http/index.js index 2d7781fc3f..dd6b19b430 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -21,6 +21,7 @@ function uriToMultiaddr (uri) { } function HttpApi (repo, config, cliArgs) { + cliArgs = cliArgs || {} this.node = undefined this.server = undefined @@ -71,12 +72,13 @@ function HttpApi (repo, config, cliArgs) { init: init, start: true, config: config, - local: cliArgs && cliArgs.local, - pass: cliArgs && cliArgs.pass, + local: cliArgs.local, + pass: cliArgs.pass, EXPERIMENTAL: { - pubsub: cliArgs && cliArgs.enablePubsubExperiment, - dht: cliArgs && cliArgs.enableDhtExperiment, - sharding: cliArgs && cliArgs.enableShardingExperiment + pubsub: cliArgs.enablePubsubExperiment, + ipnsPubsub: cliArgs.enableNamesysPubsub, + dht: cliArgs.enableDhtExperiment, + sharding: cliArgs.enableShardingExperiment }, libp2p: libp2p }) diff --git a/test/cli/commands.js b/test/cli/commands.js index a14a31e826..9ba3f44519 100644 --- a/test/cli/commands.js +++ b/test/cli/commands.js @@ -4,7 +4,7 @@ const expect = require('chai').expect const runOnAndOff = require('../utils/on-and-off') -const commandCount = 82 +const commandCount = 86 describe('commands', () => runOnAndOff((thing) => { let ipfs diff --git a/test/cli/name-pubsub.js b/test/cli/name-pubsub.js new file mode 100644 index 0000000000..6b252f67a1 --- /dev/null +++ b/test/cli/name-pubsub.js @@ -0,0 +1,254 @@ +/* eslint max-nested-callbacks: ["error", 7] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const series = require('async/series') +const ipfsExec = require('../utils/ipfs-exec') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'js' }) + +const checkAll = (bits) => string => bits.every(bit => string.includes(bit)) +const emptyDirCid = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' + +const spawnDaemon = (callback) => { + df.spawn({ + exec: `./src/cli/bin.js`, + args: ['--enable-namesys-pubsub'], + initOptions: { bits: 512 } + }, callback) +} + +describe('name-pubsub', () => { + describe('enabled', () => { + let ipfsA + let ipfsB + let nodeAId + let nodeBId + let bMultiaddr + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + series([ + (cb) => { + spawnDaemon((err, node) => { + expect(err).to.not.exist() + ipfsA = ipfsExec(node.repoPath) + nodes.push(node) + cb() + }) + }, + (cb) => { + spawnDaemon((err, node) => { + expect(err).to.not.exist() + ipfsB = ipfsExec(node.repoPath) + nodes.push(node) + cb() + }) + } + ], done) + }) + + // Get node ids + before(function (done) { + parallel([ + (cb) => { + ipfsA('id').then((res) => { + nodeAId = JSON.parse(res) + cb() + }) + }, + (cb) => { + ipfsB('id').then((res) => { + const id = JSON.parse(res) + + nodeBId = id + bMultiaddr = id.addresses[0] + cb() + }) + } + ], done) + }) + + // Connect + before(function () { + return ipfsA('swarm', 'connect', bMultiaddr) + .then((out) => { + expect(out).to.eql(`connect ${bMultiaddr} success\n`) + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + describe('pubsub commands', () => { + it('should get enabled state of pubsub', function () { + return ipfsA('name pubsub state') + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('enabled') // enabled + }) + }) + + it('should subscribe on name resolve', function () { + this.timeout(80 * 1000) + + return ipfsB(`name resolve ${nodeAId.id}`) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed) + + return ipfsB('pubsub ls') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('/record/') // have a record ipns subscribtion + + return ipfsB('name pubsub subs') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string(`/ipns/${nodeAId.id}`) // have subscription + }) + }) + + it('should be able to cancel subscriptions', function () { + this.timeout(80 * 1000) + + return ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('no subscription') // tried to cancel a not yet subscribed id + + return ipfsA(`name resolve ${nodeBId.id}`) + }) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + return ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`) + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('canceled') // canceled now + + return ipfsA('pubsub ls') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string('/ipns/') // ipns subscribtion not available + + return ipfsA('name pubsub subs') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available + }) + }) + }) + + describe('pubsub records', () => { + let cidAdded + + before(function (done) { + this.timeout(50 * 1000) + ipfsA('add src/init-files/init-docs/readme') + .then((out) => { + cidAdded = out.split(' ')[1] + done() + }) + }) + + it('should publish the received record to the subscriber', function () { + this.timeout(80 * 1000) + + return ipfsB(`name resolve ${nodeBId.id}`) + .then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([emptyDirCid])) // Empty dir received (subscribed) + + return ipfsA(`name resolve ${nodeBId.id}`) + }) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + return ipfsB(`name publish ${cidAdded}`) + }) + .then((res) => { + // published to IpfsB and published through pubsub to ipfsa + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) + + return ipfsB(`name resolve ${nodeBId.id}`) + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) + + return ipfsA(`name resolve ${nodeBId.id}`) + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) // value propagated to node B + }) + }) + }) + }) + + describe('disabled', () => { + let ipfsA + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + df.spawn({ + exec: `./src/cli/bin.js`, + config: {}, + initOptions: { bits: 512 } + }, (err, node) => { + expect(err).to.not.exist() + ipfsA = ipfsExec(node.repoPath) + nodes.push(node) + done() + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should get disabled state of pubsub', function () { + return ipfsA('name pubsub state') + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('disabled') + }) + }) + + it('should get error getting the available subscriptions', function () { + return ipfsA('name pubsub subs') + .catch((err) => { + expect(err).to.exist() // error as it is disabled + expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') + }) + }) + + it('should get error canceling a subscription', function () { + return ipfsA('name pubsub cancel /ipns/QmSWxaPcGgf4TDnFEBDWz2JnbHywF14phmY9hNcAeBEK5v') + .catch((err) => { + expect(err).to.exist() // error as it is disabled + expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') + }) + }) + }) +}) diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index 6fde7915c9..9d28ea3332 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -124,6 +124,13 @@ describe('interface-ipfs-core tests', () => { } })) + tests.namePubsub(CommonFactory.create({ + spawnOptions: { + args: ['--enable-namesys-pubsub'], + initOptions: { bits: 1024 } + } + })) + tests.object(defaultCommonFactory) tests.pin(defaultCommonFactory) diff --git a/test/core/name-pubsub.js b/test/core/name-pubsub.js new file mode 100644 index 0000000000..a2225392bd --- /dev/null +++ b/test/core/name-pubsub.js @@ -0,0 +1,85 @@ +/* eslint max-nested-callbacks: ["error", 6] */ +/* eslint-env mocha */ +'use strict' + +const hat = require('hat') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') + +const isNode = require('detect-node') +const IPFS = require('../../src') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'proc' }) + +const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' + +describe('name-pubsub', function () { + if (!isNode) { + return + } + + let nodes + let nodeA + let nodeB + let idA + + const createNode = (callback) => { + df.spawn({ + exec: IPFS, + args: [`--pass ${hat()}`, '--enable-namesys-pubsub'], + config: { Bootstrap: [] } + }, callback) + } + + before(function (done) { + this.timeout(40 * 1000) + + parallel([ + (cb) => createNode(cb), + (cb) => createNode(cb) + ], (err, _nodes) => { + expect(err).to.not.exist() + + nodes = _nodes + nodeA = _nodes[0].api + nodeB = _nodes[1].api + + parallel([ + (cb) => nodeA.id(cb), + (cb) => nodeB.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + + idA = ids[0] + nodeA.swarm.connect(ids[1].addresses[0], done) + }) + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should publish and then resolve correctly', function (done) { + this.timeout(50 * 1000) + + nodeB.name.resolve(idA.id, (err) => { + expect(err).to.exist() + + nodeA.name.publish(ipfsRef, { resolve: false }, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + + nodeB.name.resolve(idA.id, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + expect(res.path).to.equal(ipfsRef) + done() + }) + }) + }) + }) +})