This repository has been archived by the owner on Aug 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
/
ipns-pubsub.js
137 lines (114 loc) · 4.52 KB
/
ipns-pubsub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/* eslint-env mocha */
'use strict'
const { fromB58String } = require('multihashes')
const base64url = require('base64url')
const ipns = require('ipns')
const delay = require('delay')
const last = require('it-last')
const pRetry = require('p-retry')
const waitFor = require('./utils/wait-for')
const { expect } = require('./utils/chai')
const daemonFactory = require('./utils/daemon-factory')
const daemonsOptions = {
args: ['--enable-namesys-pubsub'] // enable ipns over pubsub
}
const retryOptions = {
retries: 5
}
const namespace = '/record/'
const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU'
describe('ipns-pubsub', function () {
this.timeout(350 * 1000)
let nodes = []
// Spawn daemons
before(async function () {
nodes = await Promise.all([
daemonFactory.spawn({
type: 'go',
test: true,
...daemonsOptions
}),
daemonFactory.spawn({
type: 'js',
test: true,
...daemonsOptions
}),
// TODO: go-ipfs needs two nodes in the DHT to be able to publish a record
// Remove this when js-ipfs has a DHT
daemonFactory.spawn({
type: 'go',
test: true,
...daemonsOptions
})
])
})
// Connect nodes and wait for republish
before(async function () {
await nodes[0].api.swarm.connect(nodes[1].api.peerId.addresses[0])
// TODO: go-ipfs needs two nodes in the DHT to be able to publish a record
// Remove this when js-ipfs has a DHT
await nodes[0].api.swarm.connect(nodes[2].api.peerId.addresses[0])
console.log('wait for republish as we can receive the republish message first') // eslint-disable-line
await delay(60000)
})
after(() => daemonFactory.clean())
it('should get enabled state of pubsub', async function () {
for (const node of nodes) {
const state = await node.api.name.pubsub.state()
expect(state).to.exist()
expect(state.enabled).to.equal(true)
}
})
it('should publish the received record to a go node and a js subscriber should receive it', async function () {
this.timeout(300 * 1000)
// TODO find out why JS doesn't resolve, might be just missing a DHT
await expect(last(nodes[1].api.name.resolve(nodes[0].api.peerId.id, { stream: false }))).to.eventually.be.rejected.with(/was not found in the network/)
await subscribeToReceiveByPubsub(nodes[0], nodes[1], nodes[0].api.peerId.id, nodes[1].api.peerId.id)
})
it('should publish the received record to a js node and a go subscriber should receive it', async function () {
this.timeout(350 * 1000)
await last(nodes[0].api.name.resolve(nodes[1].api.peerId.id, { stream: false }))
await subscribeToReceiveByPubsub(nodes[1], nodes[0], nodes[1].api.peerId.id, nodes[0].api.peerId.id)
})
})
// * IPNS resolve subscription test
// * 1) name.resolve() , which subscribes the topic
// * 2) wait to guarantee the subscription
// * 3) subscribe again just to know until when to wait (inside the scope of the test)
// * 4) wait for the other peer to get notified of the subscription
// * 5) publish new ipns record
// * 6) wait until the record is received in the test scope subscribe
// * 7) resolve ipns record
const subscribeToReceiveByPubsub = async (nodeA, nodeB, idA, idB) => {
let subscribed = false
function checkMessage (msg) {
subscribed = true
}
const keys = ipns.getIdKeys(fromB58String(idA))
const topic = `${namespace}${base64url.encode(keys.routingKey.toBuffer())}`
await waitForPeerToSubscribe(nodeB.api, topic)
await nodeB.api.pubsub.subscribe(topic, checkMessage)
await waitForNotificationOfSubscription(nodeA.api, topic, idB)
const res1 = await nodeA.api.name.publish(ipfsRef, { resolve: false })
await waitFor(() => subscribed === true, (50 * 1000))
const res2 = await last(nodeB.api.name.resolve(idA))
expect(res1.name).to.equal(idA) // Published to Node A ID
expect(res2).to.equal(ipfsRef)
}
// wait until a peer know about other peer to subscribe a topic
const waitForNotificationOfSubscription = (daemon, topic, peerId) => pRetry(async () => {
const res = await daemon.pubsub.peers(topic)
if (!res || !res.length || !res.includes(peerId)) {
throw new Error('Could not find peer subscribing')
}
}, retryOptions)
// Wait until a peer subscribes a topic
const waitForPeerToSubscribe = async (daemon, topic) => {
await pRetry(async () => {
const res = await daemon.pubsub.ls()
if (!res || !res.length || !res.includes(topic)) {
throw new Error('Could not find subscription')
}
return res[0]
}, retryOptions)
}