forked from ssbc/epidemic-broadcast-trees
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.js
129 lines (114 loc) · 3.69 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
const v3 = require('./v3')
module.exports = function (events) {
function timestamp () {
return Date.now()
}
function EBTStream (peer, remote, version, client, isMsg, onClose) {
this.paused = true // start out paused
this.remote = remote
this.peer = peer
this.version = version
this.peer.state = events.connect(this.peer.state, {
id: remote,
ts: timestamp(),
client
})
this.ended = false
this._onClose = onClose
this.isMsg = isMsg
this.sink = this.source = null
}
EBTStream.prototype.clock = function (clock) {
this.peer.state = events.peerClock(this.peer.state, {
id: this.remote,
value: clock,
ts: timestamp()
})
this.paused = false
this.peer.update()
if (this.source) this.source.resume()
}
EBTStream.prototype.write = function (data) {
if (this.peer.logging) {
if (Buffer.isBuffer(data)) { console.log('EBT:recv binary (' + this.peer.id + ')', '0x' + data.toString('hex')) } else { console.log('EBT:recv json (' + this.peer.id + ')', JSON.stringify(data, null, 2)) }
}
if (this.ended) throw new Error('write after ebt stream ended:' + this.remote)
if (this.isMsg(data)) {
this.peer.state = events.receive(this.peer.state, {
id: this.remote,
value: data,
ts: timestamp()
})
} else {
this.peer.state = events.notes(this.peer.state, {
id: this.remote,
value: data,
ts: timestamp()
})
}
this.peer.update(this.remote)
}
EBTStream.prototype.abort = EBTStream.prototype.end = function (err) {
this.ended = true
// check if we have already ended
if (!this.peer.state.peers[this.remote]) return
if (this.peer.logging) console.log('EBT:dcon', this.remote)
const peerState = this.peer.state.peers[this.remote]
this.peer.state = events.disconnect(this.peer.state, {
id: this.remote,
ts: timestamp()
})
if (this._onClose) this._onClose(peerState)
// remove from the peer...
delete this.peer.streams[this.remote]
if (this.source && !this.source.ended) this.source.abort(err)
if (this.sink && !this.sink.ended) this.sink.end(err)
}
EBTStream.prototype.canSend = function () {
const state = this.peer.state.peers[this.remote]
return (
this.sink &&
!this.sink.paused &&
!this.ended &&
// missing state means this peer was blocked,
// end immediately.
(state.blocked || state.msgs.length || state.notes)
)
}
EBTStream.prototype.resume = function () {
if (!this.sink || this.sink.paused) return
const state = this.peer.state.peers[this.remote]
while (this.canSend()) {
if (state.blocked) {
this.end()
} else if (state.msgs.length) {
if (this.peer.logging) {
if (Buffer.isBuffer(state.msgs[0])) {
console.log('EBT:send binary (' + this.peer.id + ')', '0x' + state.msgs[0].toString('hex'))
} else {
console.log('EBT:send json (' + this.peer.id + ')', JSON.stringify(state.msgs[0], null, 2))
}
}
this.sink.write(state.msgs.shift())
} else {
const notes = state.notes
state.notes = null
if (this.peer.logging) {
const formattedNotes = {}
for (const feed in notes) {
const seq = notes[feed]
formattedNotes[feed] = {
seq,
sequence: v3.getSequence(seq),
rx: v3.getReceive(seq)
}
}
console.log('EBT:send notes (' + this.peer.id + ')', formattedNotes)
}
this.sink.write(notes)
}
}
}
EBTStream.prototype.pipe = require('push-stream/pipe')
return EBTStream
}