-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathJoinEventStream.js
104 lines (94 loc) · 3.85 KB
/
JoinEventStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
const EventStream = require('./EventStream');
const { wrapAndCheck } = require('./util');
/**
* Calculate the actual version number from a possibly relative (negative) version number.
*
* @param {number} version The version to normalize.
* @param {number} length The maximum version number
* @returns {number} The absolute version number.
*/
function normalizeVersion(version, length) {
return version < 0 ? version + length + 1 : version;
}
/**
* An event stream is a simple wrapper around an iterator over storage documents.
* It implements a node readable stream interface.
*/
class JoinEventStream extends EventStream {
/**
* @param {string} name The name of the stream.
* @param {Array<string>} streams The name of the streams to join together.
* @param {EventStore} eventStore The event store to get the stream from.
* @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
* @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
*/
constructor(name, streams, eventStore, minRevision = 1, maxRevision = -1) {
super(name, eventStore, minRevision, maxRevision);
if (!(streams instanceof Array) || streams.length === 0) {
throw new Error(`Invalid list of streams supplied to JoinStream ${name}.`);
}
this.streamIndex = eventStore.storage.index;
// Translate revisions to index numbers (1-based) and wrap around negatives
this.minRevision = normalizeVersion(minRevision, eventStore.length);
this.maxRevision = normalizeVersion(maxRevision, eventStore.length);
this.fetch = function() {
this._next = new Array(streams.length).fill(undefined);
return streams.map(streamName => {
if (!eventStore.streams[streamName]) {
return { next() { return { done: true }; } };
}
const streamIndex = eventStore.streams[streamName].index;
const from = streamIndex.find(this.minRevision, this.minRevision <= this.maxRevision);
const until = streamIndex.find(this.maxRevision, this.minRevision > this.maxRevision);
return eventStore.storage.readRange(from, until, streamIndex);
});
}
this._iterator = null;
}
/**
* Returns the value of the iterator at position `index`
* @param {number} index The iterator position for which to return the next value
* @returns {*}
*/
getValue(index) {
const next = this._iterator[index].next();
return next.done ? false : next.value;
}
/**
* @private
* @param {number} first
* @param {number} second
* @returns {boolean} If the first item follows after the second in the given read order determined by this.minRevision and this.maxRevision.
*/
follows(first, second) {
return (this.minRevision > this.maxRevision ? first < second : first > second);
}
/**
* @private
* @returns {object|boolean} The next event or false if no more events in the stream.
*/
next() {
if (!this._iterator) {
this._iterator = this.fetch();
}
let nextIndex = -1;
this._next.forEach((value, index) => {
if (typeof value === 'undefined') {
value = this._next[index] = this.getValue(index);
}
if (value === false) {
return;
}
if (nextIndex === -1 || this.follows(this._next[nextIndex].metadata.commitId, value.metadata.commitId)) {
nextIndex = index;
}
});
if (nextIndex === -1) {
return false;
}
const next = this._next[nextIndex];
this._next[nextIndex] = undefined;
return next;
}
}
module.exports = JoinEventStream;