Skip to content
This repository has been archived by the owner on Aug 1, 2023. It is now read-only.

Commit

Permalink
test: add ipns pubsub tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 5, 2018
1 parent 4085089 commit 9b9de01
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 17 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"form-data": "^2.3.3",
"go-ipfs-dep": "~0.4.18",
"hat": "0.0.3",
"ipfs": "~0.33.0",
"ipfs": "ipfs/js-ipfs#feat/ipns-over-pubsub",
"ipfs-api": "^26.1.2",
"ipfs-unixfs": "~0.1.16",
"ipfsd-ctl": "~0.40.0",
Expand Down
376 changes: 376 additions & 0 deletions test/name-pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)

const { fromB58String } = require('multihashes')

const parallel = require('async/parallel')
const retry = require('async/retry')
const series = require('async/series')

const IPFS = require('ipfs')
const DaemonFactory = require('ipfsd-ctl')

const waitFor = require('./utils/wait-for')

const config = {
Addresses: {
API: '/ip4/0.0.0.0/tcp/0',
Gateway: '/ip4/0.0.0.0/tcp/0',
Swarm: []
},
Bootstrap: []
}

const spawnJsDaemon = (callback) => {
// DaemonFactory.create({ type: 'js' })
DaemonFactory.create({ type: 'proc', exec: IPFS }) // TODO change after debuggin
.spawn({
disposable: true,
initOptions: { bits: 512 },
args: ['--enable-namesys-pubsub'], // enable ipns over pubsub
config
}, callback)
}

const spawnGoDaemon = (callback) => {
DaemonFactory.create()
.spawn({
disposable: true,
initOptions: { bits: 1024 },
args: ['--enable-namesys-pubsub'],
config
}, callback)
}

const subscribeToReceiveByPubsub = (nodeA, nodeB, nodeAId, callback) => {
const topic = `/ipns/${fromB58String(nodeAId.id).toString()}`
let subscribed = false

nodeB.api.name.resolve(nodeAId.id, (err) => {
expect(err).to.exist()

function checkMessage(msg) {
subscribed = true
}

series([
(cb) => waitForPeerToSubscribe(nodeB.api, cb),
(cb) => nodeB.api.pubsub.subscribe(topic, checkMessage, cb),
(cb) => nodeA.api.name.publish(ipfsRef, { resolve: false }, cb),
(cb) => nodeA.api.name.resolve(nodeAId.id, cb),
/* (cb) => waitFor(() => subscribed === true, cb), */ (cb) => setTimeout(() => cb(), 10000),
(cb) => nodeB.api.name.resolve(nodeAId.id, cb)
], (err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()

expect(res[2].name).to.equal(nodeAId.id) // Published to Node A ID
expect(res[3].path || res[3]).to.equal(ipfsRef) // TODO: remove path once not using proc daemon
expect(res[5].path || res[5]).to.equal(ipfsRef)

callback()
})
})
}

const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU'

describe.only('name-pubsub', () => {
describe('js nodes', () => {
let nodeAId
let nodes = []

// Spawn daemons
before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the timeout
this.timeout(80 * 1000)

series([
(cb) => spawnJsDaemon(cb),
(cb) => spawnJsDaemon(cb),
], (err, daemons) => {
expect(err).to.not.exist()
nodes = daemons
done()
})
})

// Get node id
before(function (done) {
nodes[0].api.id((err, res) => {
expect(err).to.not.exist()
expect(res.id).to.exist()

nodeAId = res
nodes[1].api.swarm.connect(res.addresses[0], done)
})
})

after(function (done) {
this.timeout(60 * 1000)
parallel(nodes.map((node) => (cb) => node.stop(cb)), done)
})

it('should get enabled state of pubsub', function (done) {
nodes[0].api.name.pubsub.state((err, state) => {
expect(err).to.not.exist()
expect(state).to.exist()
expect(state.enabled).to.equal(true)

done()
})
})

it('should publish the received record to a js node subscriber', function (done) {
this.timeout(300 * 1000)

subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId, done)
})
})

describe('go nodes', () => {
let nodeAId
let nodes = []

// Spawn daemons
before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the timeout
this.timeout(80 * 1000)

series([
(cb) => spawnGoDaemon(cb),
(cb) => spawnGoDaemon(cb),
], (err, daemons) => {
expect(err).to.not.exist()
nodes = daemons

done()
})
})

// Connect nodes
before(function (done) {
nodes[0].api.id((err, res) => {
expect(err).to.not.exist()
expect(res.id).to.exist()

nodeAId = res
nodes[1].api.swarm.connect(res.addresses[0], done)
})
})

after(function (done) {
this.timeout(60 * 1000)
parallel(nodes.map((node) => (cb) => node.stop(cb)), done)
})

it('should get enabled state of pubsub', function (done) {
nodes[0].api.name.pubsub.state((err, state) => {
expect(err).to.not.exist()
expect(state).to.exist()
expect(state.enabled).to.equal(true)

done()
})
})

it('should publish the received record to a go node subscriber', function (done) {
this.timeout(300 * 1000)

subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId, done)
})
})

describe.skip('bybrid nodes', () => {
let nodeAId
let nodeBId
let nodes = []

// Spawn daemons
before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the timeout
this.timeout(80 * 1000)

series([
(cb) => spawnGoDaemon(cb),
(cb) => spawnJsDaemon(cb),
], (err, daemons) => {
expect(err).to.not.exist()
nodes = daemons

done()
})
})

// Get node ids
before(function (done) {
this.timeout(60 * 1000)

parallel([
(cb) => nodes[0].api.id(cb),
(cb) => nodes[1].api.id(cb)
], (err, ids) => {
expect(err).to.not.exist()
expect(ids).to.exist()
expect(ids[0].id).to.exist()
expect(ids[1].id).to.exist()

nodeAId = ids[0]
nodeBId = ids[1]

nodes[1].api.swarm.connect(ids[0].addresses[0], done)
})
})

after(function (done) {
this.timeout(60 * 1000)
parallel(nodes.map((node) => (cb) => node.stop(cb)), done)
})

it('should get enabled state of pubsub', function (done) {
nodes[0].api.name.pubsub.state((err, state) => {
expect(err).to.not.exist()
expect(state).to.exist()
expect(state.enabled).to.equal(true)

done()
})
})

it('should publish the received record to a go node and a js subscriber should receive it', function (done) {
this.timeout(250 * 1000)
const topic = `/ipns/${fromB58String(nodeAId.id).toString()}`
let subscribed = false

nodes[1].api.name.resolve(nodeAId.id, (err) => {
expect(err).to.exist()

function checkMessage(msg) {
console.log('msg received', msg)
subscribed = true
}

series([
(cb) => waitForPeerToSubscribe(nodes[1].api, cb),
(cb) => nodes[1].api.pubsub.subscribe(topic, checkMessage, cb),
(cb) => nodes[0].api.name.publish(ipfsRef, { resolve: false }, cb),
(cb) => nodes[0].api.name.resolve(nodeAId.id, cb),
// (cb) => waitFor(() => subscribed === true, 50 * 1000, cb),
(cb) => setTimeout(() => cb(), 10000),
(cb) => nodes[1].api.name.resolve(nodeAId.id, cb)
], (err, res) => {
console.log('res', res)
console.log('err', err)
expect(err).to.not.exist()
expect(res).to.exist()

expect(res[2].name).to.equal(nodeAId.id) // Published to Node A ID
expect(res[3].path).to.equal(ipfsRef)
expect(res[5].path).to.equal(ipfsRef)
done()
})
})
})

it('should publish the received record to a js node and a go subscriber should receive it', function (done) {
this.timeout(350 * 1000)
const topic = `/ipns/${fromB58String(nodeBId.id).toString()}`
let subscribed = false

nodes[0].api.name.resolve(nodeBId.id, (err, res) => {
expect(err).to.exist()

function checkMessage(msg) {
console.log('msg received')
subscribed = true
}

series([
(cb) => waitForPeerToSubscribe(nodes[0].api, cb),
// (cb) => nodes[0].api.pubsub.subscribe(topic, checkMessage, cb),
(cb) => nodes[1].api.name.publish(ipfsRef, { resolve: false }, cb),
(cb) => setTimeout(cb, 30000),
(cb) => nodes[1].api.name.resolve(nodeBId.id, cb),
// (cb) => waitFor(() => subscribed === true, 50 * 1000, cb),
(cb) => nodes[0].api.name.resolve(nodeBId.id, cb)
], (err, res) => {
console.log('res', res)
console.log('err', err)
expect(err).to.not.exist()
expect(res).to.exist()

expect(res[2].name).to.equal(nodeBId.id) // Published to Node A ID
expect(res[3]).to.equal(ipfsRef)
expect(res[5]).to.equal(ipfsRef)
done()
})
})
})
})
})

// Wait until a peer subscribes a topic
const waitForPeerToSubscribe = (daemon, callback) => {
retry({
times: 5,
interval: 2000
}, (next) => {
daemon.name.pubsub.subs((error, res) => {
if (error) {
return next(error)
}

if (!res || !res.length) {
return next(new Error('Could not find subscription'))
}

return next(null, res[0])
})
}, callback)
}

/*
// Wait until a peer subscribes a topic
const waitForPeerToSubscribe2 = (daemon, callback) => {
retry({
times: 5,
interval: 2000
}, (next) => {
daemon.pubsub.ls((error, res) => {
if (error) {
return next(error)
}
if (!res || !res.length) {
return next(new Error('Could not find subscription'))
}
return next(null, res[0])
})
}, callback)
}
const waitForPeerKnowingOfTheSubscription = (daemon, subscription, callback) => {
retry({
times: 5,
interval: 2000
}, (next) => {
daemon.pubsub.peers(subscription, (error, res) => {
if (error) {
return next(error)
}
if (!res || !res.length) {
return next(new Error('Peer did not subscribe'))
}
return next(null, res[0])
})
}, callback)
}
*/
1 change: 1 addition & 0 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ require('./ipns')
require('./kad-dht')
require('./pin')
require('./files')
require('./name-pubsub')
Loading

0 comments on commit 9b9de01

Please sign in to comment.