forked from ssbc/epidemic-broadcast-trees
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.js
489 lines (416 loc) · 16 KB
/
events.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
'use strict'
module.exports = function (version) {
const { note, getReceive, getReplicate, getSequence } = version
const exports = {
note,
getReceive,
getReplicate,
getSequence
}
function isEmpty (o) {
if (o == null) return true
return Object.keys(o).length === 0
}
function isObject (o) {
return o && typeof o === 'object'
}
function isBlocked (state, id, target) {
return state.blocks[id] && state.blocks[id][target]
}
function isShared (state, id, peerId) {
return state.follows[id] && !isBlocked(state, id, peerId)
}
// check if a feed is already being replicated on another peer from ignoreId
function isAlreadyReplicating (state, feedId, ignoreId) {
for (const id in state.peers) {
if (id !== ignoreId) {
const peer = state.peers[id]
if (peer.notes && getReceive(peer.notes[id])) return id
// for replicating the node must have replicated something not just rx
// this fixed a partial replication bug where a node is unable to send the full log
const idHasSent = peer.replicating && peer.replicating[id] && peer.replicating[id].sent !== -1
if (peer.replicating && peer.replicating[feedId] && peer.replicating[feedId].rx && idHasSent) return id
}
}
return false
}
// lower numbers should be respected
// if one is -1 and the other is not, use the other
function fixSeq (local, remote) {
if (local == null) return remote
if (local === -1 || remote === -1) return remote
if (remote === 0) return 0
if (remote > 0 && local > 0 && remote < local) return remote
return Math.max(local, remote)
}
// check if a feed is available from a peer apart from ignoreId
function isAvailable (state, feedId, ignoreId) {
for (const peerId in state.peers) {
if (peerId !== ignoreId) {
const peer = state.peers[peerId]
// BLOCK: check wether id has blocked this peer
if (
((peer.clock && peer.clock[feedId]) || 0) > (state.clock[feedId] || 0) &&
isShared(state, feedId, peerId)
) return true
}
}
}
// jump to a particular key in a list, then iterate from there
// back around to the key. this is used for switching away from
// peers that stall so that you'll rotate through all the peers
// not just swich between two different peers.
function eachFrom (keys, key, iter) {
const i = keys.indexOf(key)
if (!~i) return
// start at 1 because we want to visit all keys but key.
for (let j = 1; j < keys.length; j++) {
if (iter(keys[(j + i) % keys.length], j)) { return }
}
}
function setNotes (peer, feed, seq, rx) {
peer.notes = peer.notes || {}
peer.notes[feed] = note(seq, rx)
const rep = peer.replicating[feed]
if (rep) {
rep.rx = rx
rep.requested = seq
}
}
// defaults for backwards compatibility
exports.getMsgAuthor = function (msg) {
return msg.author
}
exports.getMsgSequence = function (msg) {
return msg.sequence
}
exports.initialize = function (id, getMsgAuthor, getMsgSequence) {
if (getMsgAuthor) { exports.getMsgAuthor = getMsgAuthor }
if (getMsgSequence) { exports.getMsgSequence = getMsgSequence }
return {
id,
clock: null,
follows: {},
blocks: {},
peers: {},
receive: []
}
}
exports.clock = function (state, clock) {
state.clock = clock
return state
}
exports.connect = function (state, ev) {
if (state.peers[ev.id]) throw new Error('already connected to peer:' + ev.id)
if (typeof ev.client !== 'boolean') throw new Error('connect.client must be boolean')
// if (isBlocked(state, state.id, ev.id)) return state
state.peers[ev.id] = {
blocked: isBlocked(state, state.id, ev.id),
clock: null,
client: !!ev.client,
msgs: [],
retrive: [],
notes: null,
// if we are client, wait until we receive notes to send code.
// this is a weird way of doing it! shouldn't we just have a bit of state
// for wether we have received a vector clock
replicating: ev.client ? null : {}
}
return state
}
exports.disconnect = function (state, ev) {
delete state.peers[ev.id]
return state
}
// this is when the stored peer clock has been loaded from the local database.
// note, this must be handled before any messages are received.
exports.peerClock = function (state, ev) {
if (!state.peers[ev.id]) { throw new Error('peerClock called for:' + ev.id + ' but only connected to:' + Object.keys(state.peers)) }
const peer = state.peers[ev.id]
const clock = peer.clock = ev.value
// client should wait for the server notes, so that stream
// can error before a peer sends a massive handshake.
if (peer.replicating == null) return state
// always set an empty clock here, so that if we don't have anything
// to send, we still send this empty clock. This only happens on a new connection.
// in every other situation, clock is only sent if there is something in it.
peer.notes = peer.notes || {}
// iterate over following and create replications.
// if we want to replicate a peer that has changed since their clock,
// create a replication for that peer.
for (const id in state.follows) {
const seq = clock[id] || 0; const lseq = state.clock[id] || 0
// BLOCK: check wether id has blocked this peer
if (isShared(state, id, ev.id) && seq !== -1 && seq !== state.clock[id]) {
// if we are already replicating, and this feed is at zero, ask for it anyway,
// XXX if a feed is at zero, but we are replicating on another peer
// just don't ask for it yet?
const replicating = isAlreadyReplicating(state, id, ev.id)// && lseq
peer.replicating = peer.replicating || {}
peer.replicating[id] = {
tx: false,
rx: !replicating,
sent: null,
requested: lseq
}
setNotes(peer, id, lseq, !replicating)
}
}
return state
}
// XXX handle replicating with only one peer.
exports.follow = function (state, ev) {
// set to true once we have asked for this feed from someone.
let replicating = false
if (!!state.follows[ev.id] !== ev.value) {
state.follows[ev.id] = ev.value
for (const id in state.peers) {
const peer = state.peers[id]
if (!peer.clock || !peer.replicating || !isShared(state, ev.id, id)) continue
// BLOCK: check wether this feed has has blocked this peer.
// ..... don't replicate feeds with peers that have blocked them at all?
// cases:
// don't have feed
// do have feed
// peer has feed
// peer rejects feed
const seq = peer.clock[ev.id] || 0; const lseq = state.clock[ev.id] || 0
if (seq === -1) {
// peer explicitly does not replicate this feed, don't ask for it.
} else if (ev.value === false) { // unfollow
setNotes(peer, ev.id, -1, false)
} else if (ev.value === true && seq !== state.clock[ev.id]) {
peer.replicating[ev.id] = {
rx: true,
tx: false,
sent: -1,
requested: lseq
}
setNotes(peer, ev.id, lseq, !replicating)
replicating = true
}
}
}
return state
}
exports.retrive = function (state, msg) {
// check if any peer requires this msg
for (const id in state.peers) {
const peer = state.peers[id]
if (!peer.replicating) continue
// BLOCK: check wether id has blocked this peer
const author = exports.getMsgAuthor(msg)
const sequence = exports.getMsgSequence(msg)
const rep = peer.replicating[author]
if (rep && rep.tx && rep.sent === sequence - 1) {
rep.sent++
peer.msgs.push(msg)
if (rep.sent < state.clock[author]) {
// use continue, not return because we still need to loop through other peers.
if (~peer.retrive.indexOf(author)) continue
peer.retrive.push(author)
}
}
}
return state
}
function isAhead (seq1, seq2) {
if (seq2 === -1) return false
if (seq2 == null) return true
if (seq1 > seq2) return true
}
exports.append = function (state, msg) {
const author = exports.getMsgAuthor(msg)
const sequence = exports.getMsgSequence(msg)
// check if any peer requires this msg
if (state.clock[author] != null && state.clock[author] !== sequence - 1) { return state } // ignore
const lseq = state.clock[author] = sequence
for (const id in state.peers) {
const peer = state.peers[id]
if (!peer.clock || !peer.replicating || !isShared(state, author, id)) continue
// BLOCK: check wether msg.author has blocked this peer
const seq = peer.clock[author]
const rep = peer.replicating[author]
if (rep && rep.tx && rep.sent === lseq - 1 && lseq > seq) {
peer.msgs.push(msg)
rep.sent++
} // eslint-disable-line
// if we are ahead of this peer, and not in tx mode, let them know that.
else if (
isAhead(lseq, seq) &&
(rep ? !rep.tx && rep.sent != null : state.follows[author])
) { setNotes(peer, author, sequence, false) }
}
return state
}
// XXX if we only receive from a single peer,
// then we shouldn't really get known messages?
// except during the race when we have disabled a peer
// but they havn't noticed yet.
exports.receive = function (state, ev) {
const msg = ev.value
// receive a message, validate and append.
// if this message is forked, disable this feed
if (!state.peers[ev.id]) throw new Error('lost peer state:' + ev.id)
// we _know_ that this peer is upto at least this message now.
// (but maybe they already told us they where ahead further)
const author = exports.getMsgAuthor(msg)
const sequence = exports.getMsgSequence(msg)
const peer = state.peers[ev.id]
const rep = peer.replicating[author]
// if we havn't asked for this, ignore it. (this is remote speaking protocol wrong!)
if (!rep) return state
peer.clock[author] = Math.max(peer.clock[author], sequence)
rep.sent = Math.max(rep.sent, sequence)
// if this message has already been seen, ignore.
if (state.clock[author] >= sequence) {
if (rep.rx) {
setNotes(peer, author, state.clock[author], false)
}
// XXX activate some other peer?
return state
}
// remember the time of the last message received
state.peers[ev.id].ts = ev.ts
// FORKS ignore additional messages if we have already found an invalid one.
if (isShared(state, author, ev.id)) { state.receive.push(ev) }
// Q: possibly update the receiving mode?
return state
}
// XXX check if we are already receiving a feed
// and if so put this into lazy mode.
exports.notes = function (state, ev) {
// update replicating modes
let clock = ev.value
// support sending clocks inside a thing with additional properties.
// this is to allow room for backwards compatible upgrades.
if (isObject(ev.value.clock)) { clock = ev.value.clock }
const peer = state.peers[ev.id]
if (!peer) throw new Error('lost state of peer:' + ev.id)
if (!peer.clock) throw new Error("received notes, but has not set the peer's clock yet")
let count = 0
// if we are client, and this is the first notes we receive
if (!peer.replicating) {
peer.replicating = {}
state = exports.peerClock(state, { id: ev.id, value: state.peers[ev.id].clock })
}
for (const id in clock) {
count++
const seq = peer.clock[id] = fixSeq(peer.clock[id], getSequence(clock[id]))
const tx = getReceive(clock[id]) // is even
const isReplicate = getReplicate(clock[id]) // !== -1
const lseq = state.clock[id] || 0
// check if we are not following this feed.
// BLOCK: or wether id has blocked this peer
if (!isShared(state, id, ev.id)) {
if (!peer.replicating[id]) { setNotes(peer, id, -1, false) }
peer.replicating[id] = { tx: false, rx: false, sent: -1, requested: -1 }
} else {
let rep = peer.replicating[id]
const replicating = isAlreadyReplicating(state, id, ev.id)
if (!rep) {
rep = peer.replicating[id] = {
tx: true,
rx: true,
sent: seq,
requested: lseq
}
setNotes(peer, id, lseq, lseq < seq && !replicating)
} else if (!rep.rx && seq > lseq) {
if (!replicating) {
peer.ts = ev.ts // remember ts, so we can switch this feed if necessary
setNotes(peer, id, lseq, true)
} else {
// if we are already replicating this via another peer
// switch to this peer if it is further ahead.
// (todo?: switch if the other peer's timestamp is old?)
const _peer = state.peers[replicating]
// note: _peer.clock[id] may be undefined, if we have
// just connected to them and sent our notes but not
// received theirs.
if (seq > (_peer.clock[id] || 0)) {
peer.ts = ev.ts
setNotes(peer, id, lseq, true)
setNotes(_peer, id, lseq, false) // deactivate the previous peer
}
}
}
// positive seq means "send this to me please"
rep.tx = tx
// in the case we are already ahead, get ready to send them messages.
rep.sent = seq
if (lseq > seq) {
if (tx) peer.retrive.push(id)
else if (isReplicate) setNotes(peer, id, lseq, rep.rx)
}
}
}
peer.recvNotes = (peer.recvNotes || 0) + count
return state
}
exports.timeout = function (state, ev) {
const want = {}
for (const peerId in state.peers) {
const peer = state.peers[peerId]
// check if the peer hasn't received a message recently.
// if we havn't received a message from this peer recently
if ((peer.ts || 0) + state.timeout < ev.ts) {
// check if they have claimed a higher sequence, but not sent us
for (const id in peer.replicating) {
const rep = peer.replicating[id]
// if yes, prepare to switch this feed to that peer
if (rep.rx && isAvailable(state, id, peerId)) {
want[id] = peerId
setNotes(peer, id, state.clock[id], false)
}
}
}
}
const peerIds = Object.keys(state.peers)
for (const feedId in want) {
const ignoreId = want[feedId]
eachFrom(peerIds, ignoreId, function (peerId) {
const peer = state.peers[peerId]
if ((peer.clock && peer.clock[feedId]) || state.clock[feedId] < 0 || 0) {
peer.replicating = peer.replicating || {}
peer.replicating[feedId] = peer.replicating[feedId] || {
tx: false, rx: true, sent: -1, requested: state.clock[feedId]
}
setNotes(peer, feedId, state.clock[feedId], true)
peer.ts = ev.ts
// returning true triggers the end of eachFrom
return true
}
})
}
return state
}
exports.block = function (state, ev) {
if (!ev.value) {
if (state.blocks[ev.id]) delete state.blocks[ev.id][ev.target]
if (isEmpty(state.blocks[ev.id])) { delete state.blocks[ev.id] }
} else {
state.blocks[ev.id] = state.blocks[ev.id] || {}
state.blocks[ev.id][ev.target] = true
// if we blocked this peer, and we are also connected to them.
// then stop replicating immediately.
if (state.id === ev.id && state.peers[ev.target]) {
// end replication immediately.
state.peers[ev.target].blocked = ev.value
}
for (const id in state.peers) {
const peer = state.peers[id]
if (!peer.replicating) continue
if (id === ev.target && peer.replicating[ev.id]) { setNotes(peer, ev.id, -1, false) }
}
}
return state
}
return exports
}
/*
what does a fork proof look like?
usually, you have one message, and receive a subsequent message.
(n, n'+1), except that n'+1 does not extend n. but both have valid
signatures.
*/