-
Notifications
You must be signed in to change notification settings - Fork 3
/
simple-rpc.js
260 lines (233 loc) · 7.02 KB
/
simple-rpc.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
/**
* This RPC has 2 remote procedures.
* First is "blocks" which sends one-or-more feeds to remote peer.
* Second is "query" which expects remote peer to respond with "blocks".
* Feel free to copy and modify if you need something different. (AGPL)
*/
import { Hub } from 'piconet'
import { Feed, au8, cmp, feedFrom } from 'picofeed'
import { encode, decode } from 'cborg'
import { write } from 'piconuro'
function randomBytes (n) {
const b = new Uint8Array(n)
globalThis.crypto.getRandomValues(b)
return b
}
// Message Types
const OK = 0
const ERR = -1
const QUERY = 1
const BLOCKS = 2
const NODE_ID = 3
export class SimpleRPC {
/** Handlers spec:
* {
* // When peer conects
* onconnect: send => { ... }
*
* // When peer asks for blocks
* onquery: async query => { return Feed || Feeds[] }
*
* // When blocks received from peer
* onblocks: async feed => { ... }
*
* // When peer disconnects
* ondisconnect: peer => { ... }
* }
*/
constructor (handlers) {
this.nodeId = randomBytes(8)
this._controller = this._controller.bind(this)
this.hub = new Hub(
(...a) => this._controller(...a),
(...a) => this._ondisconnect(...a)
)
this.handlers = handlers
// Export connections neuron
const [$n, set] = write([])
this._setConnections = set
this.$connections = $n
}
spawnWire () {
// Internal 'onconnect'
const plug = this.hub.createWire(hubEnd => {
// Exchange node Id
hubEnd.postMessage(encodeMsg(NODE_ID, this.nodeId), true)
.then(([msg, reply]) => {
if (decodeMsg(msg).type !== OK) return // dupe-peer, wire will close
this._setConnections(Array.from(this.hub._nodes))
// Exchange done, continue with application handshake
return this.handlers.onconnect(hubEnd)
})
.catch(err => console.error('Handshake failed', err))
})
return plug
}
_ondisconnect (...a) {
this._setConnections(Array.from(this.hub._nodes))
this.handlers.ondisconnect(...a)
}
async _controller (node, msg, replyTo) {
try {
if (!msg.length) { // TODO: delete, this issue has been adressed
console.warn('Zero length message received')
return
}
const { type, data } = decodeMsg(msg)
switch (type) {
// Download blocks
case BLOCKS:
for await (const blocks of this._downloadFeeds(msg, replyTo)) {
const patch = await this.handlers.onblocks(blocks)
// Gossip if our store accepted the blocks
if (patch) {
this.shareBlocks(patch, node) // second param prevents echo
.catch(err => console.error('Gossip failed', err))
}
}
break
// Respond to queries
case QUERY: {
const feeds = await this.handlers.onquery(data)
// Upload feeds to peer
this._uploadFeeds(replyTo, feeds)
.catch(err => console.error('Failed uploading blocks', err))
} break
case NODE_ID:
// Prevent accidental duplicate peer connections
for (const n of this.hub._nodes) {
if (n.id && cmp(n.id, data)) {
// Ignore dupe-ack error handling as conection is about
// to be killed anyway
replyTo(encodeMsg(ERR))
return node.close(new Error('DuplicatePeer')) // dedupe peers
}
}
node.id = data
await replyTo(encodeMsg(OK))
break
default:
throw new Error(`Unknown message type: ${type}`)
}
} catch (err) {
console.error('RPC:internal error', err)
}
}
// Shares blocks to all connected peers
async shareBlocks (feeds, filter) {
if (Feed.isFeed(feeds)) feeds = [feeds]
feeds = [...feeds]
const first = feeds.shift()
const iterator = this.hub.survey(encodeMsg(BLOCKS, first), !!feeds.length, filter)
for await (const [msg, sink] of iterator) {
if (!feeds.length) continue // one feed sent, that's it.
if (typeOfMsg(msg) === OK && sink) {
this._uploadFeeds(sink, feeds)
.catch(err => console.error('Failed sending subsequent feeds', err))
} else {
console.warn('Remote rejected subsequent blocks', typeOfMsg(msg))
}
}
}
async query (node, params = {}) {
const [msg, reply] = await node.postMessage(encodeMsg(QUERY, params), true)
if (decodeMsg(msg).type === OK) return // Empty reply
// Let the controller handle the rest
this._controller(node, msg, reply)
}
// Recursively uploads feeds until array
// is empty or aborted by remote peer
_uploadFeeds (sink, feeds) {
if (!feeds?.length) return sink(encodeMsg(OK))
const remaining = [...feeds]
const current = remaining.shift()
return sink(encodeMsg(BLOCKS, current), !!remaining.length)
.then(([msg, next]) => {
if (!remaining.length) return // It's done
if (!msg) return console.warn('Empty scope')
if (typeOfMsg(msg) !== OK) return console.warn('Peer rejected subsequent blocks', typeOfMsg(msg))
return this._uploadFeeds(next, remaining)
})
}
async * _downloadFeeds (msg, sink) {
let done = false
while (!done) {
done = true
const { type, data } = decodeMsg(msg)
if (type === BLOCKS) yield data
else if (sink) {
console.warn('Expected BLOCKS but got', type)
await sink(encodeMsg(ERR), false)
continue
}
if (!sink) continue // End of stream
// Download one more feed
// updating msg and sink then do another loop.
const scope = await sink(encodeMsg(OK), true)
msg = scope[0]
sink = scope[1]
done = false
}
}
}
function encodeMsg (type, obj) {
let buffer = null
switch (type) {
case BLOCKS:
if (!obj) throw new Error('Feed expected')
obj = Feed.from(obj)
buffer = new Uint8Array(obj.tail + 1)
buffer.set(obj.buffer, 1)
break
// Serialize signals
case ERR:
case OK:
buffer = new Uint8Array(1)
break
// Serialize msgencode messages
case QUERY: {
const data = encode(obj)
buffer = new Uint8Array(data.length + 1)
buffer.set(data, 1)
} break
case NODE_ID:
buffer = new Uint8Array(8 + 1)
buffer.set(obj, 1)
break
default:
throw new Error('UnknownMessageType: ' + type)
}
buffer[0] = type
return buffer
}
function typeOfMsg (buffer) {
au8(buffer)
return buffer[0]
}
function decodeMsg (buffer) {
au8(buffer)
const type = typeOfMsg(buffer)
let data = null
switch (type) {
case BLOCKS:
data = feedFrom(buffer.subarray(1), false)
break
// deserialize signals
case OK:
data = 'OK'
break
case ERR:
data = 'ERROR'
break
// deserialize messages
case QUERY:
data = decode(buffer.subarray(1))
break
case NODE_ID:
data = buffer.subarray(1, 9)
break
default:
throw new Error('UnknownMessageType: ' + type)
}
return { type, data }
}