-
Notifications
You must be signed in to change notification settings - Fork 29
/
combineLatest.js
41 lines (40 loc) · 1.25 KB
/
combineLatest.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const Rx = require('./index');
const GroupSubscription = require('./GroupSubscription');
/**
* Rx.Observable.combineLatest = require('toy-rx/combineLatest');
*/
module.exports = function combineLatest() {
const inObservables = Array.prototype.slice.call(arguments);
const transformFn = inObservables.pop();
const outObservable = Rx.Observable.create(function subscribe(outObserver) {
const values = inObservables.map((inObservable) => undefined);
const gotValue = inObservables.map((inObservable) => false);
const outSubscription = new GroupSubscription();
inObservables.forEach((inObservable, index) => {
const inObserver = {
next: (x) => {
values[index] = x;
gotValue[index] = true;
if (gotValue.every(x => x === true)) {
try {
var y = transformFn(...values);
} catch (e) {
outObserver.error(e);
return
}
outObserver.next(y);
}
},
error: (e) => {
outObserver.error(e);
},
complete: () => {
outObserver.complete();
}
}
outSubscription.add(inObservable.subscribe(inObserver));
});
return outSubscription;
});
return outObservable;
}