-
Notifications
You must be signed in to change notification settings - Fork 4
/
index.js
134 lines (111 loc) · 2.88 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
all pull streams have these states:
{
START: {
read: READING,
abort: ABORTING
},
READY: {
read: READING,
abort: ABORTING
},
READING: {
cb: READY,
err: ERROR,
end: END
},
ABORTING: {
cb: END
},
ERROR: {},
END: {}
}
this module takes a collection of pull-streams,
and interleaves their states.
if all the streams have ended, it ends.
If it is in reading state, and one stream goes has READING->cb
it goes into READY
on read, trigger read on every stream in START or READY
on abort, trigger abort on all streams immediately***
if a stream is in READY, and big stream is in ABORT,
trigger abort
if every stream is in END or ERROR, trigger end or error
could you describe this declaritively or something?
*/
module.exports = function (ary) {
var capped = !!ary
var inputs = (ary || []).map(create), i = 0, abort, cb
function create (stream) {
return {ready: false, reading: false, ended: false, read: stream, data: null}
}
function check () {
if(!cb) return
clean()
var l = inputs.length
var _cb = cb
if(l === 0 && (abort || capped)) {
cb = null; _cb(abort || true)
return
}
//scan the inputs to check whether there is one we can use.
for(var j = 0; j < l; j++) {
var current = inputs[(i + j) % l]
if(current.ready && !current.ended) {
var data = current.data
current.ready = false
current.data = null
i ++; cb = null
return _cb(null, data)
}
}
}
function clean () {
var l = inputs.length
//iterate backwards so that we can remove items.
while(l--) {
if(inputs[l].ended)
inputs.splice(l, 1)
}
}
function next () {
var l = inputs.length
while(l--)
(function (current) {
//read the next item if we aren't already
if(l > inputs.length) throw new Error('this should never happen')
if(current.reading || current.ended || current.ready) return
current.reading = true
var sync = true
current.read(abort, function next (end, data) {
current.data = data
current.ready = true
current.reading = false
if(end === true || abort) current.ended = true
else if(end) abort = current.ended = end
//check whether we need to abort this stream.
if(abort && !end) current.read(abort, next)
if(!sync) check()
})
sync = false
})(inputs[l])
//scan the feed
check()
}
function read (_abort, _cb) {
abort = abort || _abort; cb = _cb; next()
}
read.add = function (stream) {
if(!stream) {
//the stream will now end when all the streams end.
capped = true
//we just changed state, so we may need to cb
return next()
}
inputs.push(create(stream))
next()
}
read.cap = function (err) {
read.add(null)
}
return read
}