diff --git a/package.json b/package.json index 4757f2b6..7f851324 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "devDependencies": { "aegir": "^18.1.0", "async": "^2.6.2", + "base64url": "^3.0.0", "bl": "^3.0.0", "bs58": "^4.0.1", "chai": "^4.2.0", @@ -55,12 +56,14 @@ "ipfs-repo": "~0.26.1", "ipfs-unixfs": "~0.1.16", "ipfsd-ctl": "~0.42.2", + "ipns": "~0.5.1", "is-ci": "^2.0.0", "left-pad": "^1.3.0", "libp2p-websocket-star-rendezvous": "~0.3.0", "lodash": "^4.17.11", "merge-options": "^1.0.1", "mocha": "^5.2.0", + "multihashes": "~0.4.14", "ncp": "^2.0.0", "pretty-bytes": "^5.1.0", "random-fs": "^1.0.3", diff --git a/test/ipns-pubsub.js b/test/ipns-pubsub.js new file mode 100644 index 00000000..d852e22f --- /dev/null +++ b/test/ipns-pubsub.js @@ -0,0 +1,174 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const { fromB58String } = require('multihashes') +const base64url = require('base64url') +const ipns = require('ipns') + +const parallel = require('async/parallel') +const retry = require('async/retry') +const series = require('async/series') + +const DaemonFactory = require('ipfsd-ctl') + +const waitFor = require('./utils/wait-for') + +const config = { + Addresses: { + API: '/ip4/0.0.0.0/tcp/0', + Gateway: '/ip4/0.0.0.0/tcp/0', + Swarm: [] + } +} + +const namespace = '/record/' + +const spawnJsDaemon = (callback) => { + DaemonFactory.create({ type: 'js' }) + .spawn({ + disposable: true, + args: ['--enable-namesys-pubsub'], // enable ipns over pubsub + config + }, callback) +} + +const spawnGoDaemon = (callback) => { + DaemonFactory.create() + .spawn({ + disposable: true, + args: ['--enable-namesys-pubsub'], // enable ipns over pubsub + config + }, callback) +} + +const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' + +describe('ipns-pubsub', () => { + let nodeAId + let nodeBId + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the timeout + this.timeout(80 * 1000) + + series([ + (cb) => spawnGoDaemon(cb), + (cb) => spawnJsDaemon(cb) + ], (err, daemons) => { + expect(err).to.not.exist() + nodes = daemons + + done() + }) + }) + + // Get node ids + before(function (done) { + this.timeout(100 * 1000) + + parallel([ + (cb) => nodes[0].api.id(cb), + (cb) => nodes[1].api.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + expect(ids).to.exist() + expect(ids[0].id).to.exist() + expect(ids[1].id).to.exist() + + nodeAId = ids[0] + nodeBId = ids[1] + + nodes[0].api.swarm.connect(ids[1].addresses[0], (err) => { + expect(err).to.not.exist() + + console.log('wait for republish as we can receive the republish message first') + setTimeout(done, 60000) // wait for republish as we can receive the republish message first + }) + }) + }) + + after(function (done) { + this.timeout(60 * 1000) + parallel(nodes.map((node) => (cb) => node.stop(cb)), done) + }) + + it('should get enabled state of pubsub', function (done) { + nodes[0].api.name.pubsub.state((err, state) => { + expect(err).to.not.exist() + expect(state).to.exist() + expect(state.enabled).to.equal(true) + + done() + }) + }) + + it('should publish the received record to a go node and a js subscriber should receive it', function (done) { + this.timeout(300 * 1000) + + subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId.id, done) + }) + + it('should publish the received record to a js node and a go subscriber should receive it', function (done) { + this.timeout(350 * 1000) + + subscribeToReceiveByPubsub(nodes[1], nodes[0], nodeBId.id, done) + }) +}) + +const subscribeToReceiveByPubsub = (nodeA, nodeB, id, callback) => { + let subscribed = false + function checkMessage (msg) { + subscribed = true + } + + const keys = ipns.getIdKeys(fromB58String(id)) + const topic = `${namespace}${base64url.encode(keys.routingKey.toBuffer())}` + + // try to resolve a unpublished record (will subscribe it) + nodeB.api.name.resolve(id, (err) => { + expect(err).to.exist() // not found + + series([ + (cb) => waitForPeerToSubscribe(nodeB.api, topic, cb), + (cb) => nodeB.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => nodeA.api.name.publish(ipfsRef, { resolve: false }, cb), + (cb) => waitFor(() => subscribed === true, (50 * 1000), cb), + (cb) => nodeB.api.name.resolve(id, cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + + expect(res[2].name).to.equal(id) // Published to Node A ID + expect(res[4]).to.equal(ipfsRef) + + callback() + }) + }) +} + +// Wait until a peer subscribes a topic +const waitForPeerToSubscribe = (daemon, topic, callback) => { + retry({ + times: 5, + interval: 2000 + }, (next) => { + daemon.pubsub.ls((error, res) => { + if (error) { + return next(error) + } + + if (!res || !res.length || !res.includes(topic)) { + return next(new Error('Could not find subscription')) + } + + return next(null, res[0]) + }) + }, callback) +} diff --git a/test/node.js b/test/node.js index 4edb14ea..11c86c90 100644 --- a/test/node.js +++ b/test/node.js @@ -10,3 +10,4 @@ require('./ipns') require('./kad-dht') require('./pin') require('./files') +require('./ipns-pubsub') diff --git a/test/utils/wait-for.js b/test/utils/wait-for.js new file mode 100644 index 00000000..60fd9efd --- /dev/null +++ b/test/utils/wait-for.js @@ -0,0 +1,24 @@ +'use strict' + +/* + * Wait for a condition to become true. When its true, callback is called. + */ +module.exports = (predicate, ttl, callback) => { + if (typeof ttl === 'function') { + callback = ttl + ttl = Date.now() + (10 * 1000) + } else { + ttl = Date.now() + ttl + } + + const self = setInterval(() => { + if (predicate()) { + clearInterval(self) + return callback() + } + if (Date.now() > ttl) { + clearInterval(self) + return callback(new Error('waitFor time expired')) + } + }, 50) +}