-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
121 lines (101 loc) · 2.98 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
const sodium = require('sodium-universal')
const { EventEmitter } = require('events')
const codecs = require('codecs')
const REANNOUNCE = 15 * 60000
const SLACK = 2 * 60000
const MIN_ACKS = 5
module.exports = class ImmutableRecord extends EventEmitter {
constructor (dht, key = null, val = null, opts) {
super()
this.dht = dht
this.key = key
this.value = val
this.announcing = false
this.announced = false
this.encoding = codecs(opts && opts.encoding || null)
this._announceRunning = null
this._announceStream = null
this._timeout = null
if (!this.key) {
this.key = Buffer.alloc(32)
sodium.crypto_generichash(this.key, this.value)
}
}
async announce () {
const wasAnnouncing = this.announcing
this.announcing = true
await this._announceRunning
if (!wasAnnouncing && this.announcing && !this._announceRunning) {
this._announceRunning = this._announce()
}
return this._announceRunning
}
unannounce (force = true) {
if (!this.announcing) return
this.announcing = false
if (this._timeout) clearTimeout(this._timeout)
if (this._timeoutResolve) this._timeoutResolve()
if (force && this._announceStream) this._announceStream.destroy()
}
async _announce () {
while (this.announcing) {
await this._announceOnce()
await this._wait(REANNOUNCE)
await this._wait((SLACK * Math.random()) | 0)
}
}
async _announceOnce () {
if (this.value === null) await this.get()
return new Promise((resolve) => {
const self = this
const value = this.encoding ? this.encoding.encode(this.value) : this.value
let acks = 0
this._announceStream = this.dht.immutable.put(value, done)
this._announceStream.on('data', () => {
if (!this.announced && ++acks >= MIN_ACKS) {
this.announced = true
this.emit('announced')
}
})
function done (err) {
self._announceStream = null
resolve(!err)
}
})
}
_wait (n) {
if (!this.announcing) return
return new Promise(resolve => {
const done = () => {
this._timeout = null
this._timeoutResolve = null
resolve()
}
this._timeout = setTimeout(done, n)
this._timeoutResolve = done
})
}
static put (node, val, opts) {
if (typeof val === 'string') val = Buffer.from(val)
return new ImmutableRecord(node, null, opts)
}
static get (node, key, opts) {
if (typeof key === 'string') key = Buffer.from(key, 'hex')
return new ImmutableRecord(node, key, opts)
}
async get () {
if (this.value) return this.value
return new Promise((resolve, reject) => {
this.dht.immutable.get(this.key, (err, val) => {
if (this.value) return resolve(this.value)
if (err) return reject(err)
try {
this.value = this.encoding ? this.encoding.decode(val) : val
} catch (err) {
return reject(err)
}
resolve(val)
})
})
}
}