-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathEventStore.js
411 lines (378 loc) · 16.9 KB
/
EventStore.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
const EventStream = require('./EventStream');
const JoinEventStream = require('./JoinEventStream');
const fs = require('fs');
const path = require('path');
const events = require('events');
const Storage = require('./Storage');
const Consumer = require('./Consumer');
const { assert } = require('./util');
const ExpectedVersion = {
Any: -1,
EmptyStream: 0
};
class OptimisticConcurrencyError extends Error {}
/**
* An event store optimized for working with many streams.
* An event stream is implemented as an iterator over an index on the storage, therefore indexes need to be lightweight
* and highly performant in read-only mode.
*/
class EventStore extends events.EventEmitter {
/**
* @param {string} [storeName] The name of the store which will be used as storage prefix. Default 'eventstore'.
* @param {object} [config] An object with config options.
* @param {string} [config.storageDirectory] The directory where the data should be stored. Default './data'.
* @param {string} [config.streamsDirectory] The directory where the streams should be stored. Default '{storageDirectory}/streams'.
* @param {object} [config.storageConfig] Additional config options given to the storage backend. See `Storage`.
* @param {boolean} [config.readOnly] If the storage should be mounted in read-only mode.
*/
constructor(storeName = 'eventstore', config = {}) {
super();
if (typeof storeName !== 'string') {
config = storeName;
storeName = 'eventstore';
}
this.storageDirectory = path.resolve(config.storageDirectory || /* istanbul ignore next */ './data');
let defaults = {
dataDirectory: this.storageDirectory,
indexDirectory: config.streamsDirectory || path.join(this.storageDirectory, 'streams'),
partitioner: (event) => event.stream,
readOnly: config.readOnly || false
};
const storageConfig = Object.assign(defaults, config.storageConfig);
this.initialize(storeName, storageConfig);
}
/**
* @private
* @param {string} storeName
* @param {object} storageConfig
*/
initialize(storeName, storageConfig) {
this.streamsDirectory = path.resolve(storageConfig.indexDirectory);
this.storeName = storeName;
this.storage = (storageConfig.readOnly === true) ?
new Storage.ReadOnly(storeName, storageConfig)
: new Storage(storeName, storageConfig);
this.storage.open();
this.streams = Object.create(null);
this.streams._all = { index: this.storage.index };
this.scanStreams((err) => {
if (err) {
this.storage.close();
throw err;
}
this.emit('ready');
});
}
/**
* Scan the streams directory for existing streams so they are ready for `getEventStream()`.
*
* @private
* @param {function} callback A callback that will be called when all existing streams are found.
*/
scanStreams(callback) {
/* istanbul ignore if */
if (typeof callback !== 'function') {
callback = () => {};
}
// Find existing streams by scanning dir for filenames starting with 'stream-'
fs.readdir(this.streamsDirectory, (err, files) => {
if (err) {
return callback(err);
}
let matches;
for (let file of files) {
if ((matches = file.match(/(stream-.*)\.index$/)) !== null) {
this.registerStream(matches[1]);
}
}
callback();
});
this.storage.on('index-created', this.registerStream.bind(this));
}
/**
* @private
* @param {string} name The full stream name, including the `stream-` prefix.
*/
registerStream(name) {
/* istanbul ignore if */
if (!name.startsWith('stream-')) {
return;
}
const streamName = name.substr(7, name.length - 7);
/* istanbul ignore if */
if (streamName in this.streams) {
return;
}
const index = this.storage.openIndex('stream-'+streamName);
// deepcode ignore PrototypePollutionFunctionParams: streams is a Map
this.streams[streamName] = { index };
this.emit('stream-available', streamName);
}
/**
* Close the event store and free up all resources.
*
* @api
*/
close() {
this.storage.close();
}
/**
* Get the number of events stored.
*
* @api
* @returns {number}
*/
get length() {
return this.storage.length;
}
/**
* This method makes it so the last three arguments can be given either as:
* - expectedVersion, metadata, callback
* - expectedVersion, callback
* - metadata, callback
* - callback
*
* @private
* @param {Array<object>|object} events
* @param {number} [expectedVersion]
* @param {object|function} [metadata]
* @param {function} [callback]
* @returns {{events: Array<object>, metadata: object, callback: function, expectedVersion: number}}
*/
static fixArgumentTypes(events, expectedVersion, metadata, callback) {
if (!(events instanceof Array)) {
events = [events];
}
if (typeof expectedVersion !== 'number') {
callback = metadata;
metadata = expectedVersion;
expectedVersion = ExpectedVersion.Any;
}
if (typeof metadata !== 'object') {
callback = metadata;
metadata = {};
}
if (typeof callback !== 'function') {
callback = () => {};
}
return { events, expectedVersion, metadata, callback };
}
/**
* Commit a list of events for the given stream name, which is expected to be at the given version.
* Note that the events committed may still appear in other streams too - the given stream name is only
* relevant for optimistic concurrency checks with the given expected version.
*
* @api
* @param {string} streamName The name of the stream to commit the events to.
* @param {Array<object>|object} events The events to commit or a single event.
* @param {number} [expectedVersion] One of ExpectedVersion constants or a positive version number that the stream is supposed to be at before commit.
* @param {object} [metadata] The commit metadata to use as base. Useful for replication and adding storage metadata.
* @param {function} [callback] A function that will be executed when all events have been committed.
* @throws {OptimisticConcurrencyError} if the stream is not at the expected version.
*/
commit(streamName, events, expectedVersion = ExpectedVersion.Any, metadata = {}, callback = null) {
assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not commit to it.');
assert(typeof streamName === 'string' && streamName !== '', 'Must specify a stream name for commit.');
assert(typeof events !== 'undefined' && events !== null, 'No events specified for commit.');
({ events, expectedVersion, metadata, callback } = EventStore.fixArgumentTypes(events, expectedVersion, metadata, callback));
if (!(streamName in this.streams)) {
this.createEventStream(streamName, { stream: streamName });
}
let streamVersion = this.streams[streamName].index.length;
if (expectedVersion !== ExpectedVersion.Any && streamVersion !== expectedVersion) {
throw new OptimisticConcurrencyError(`Optimistic Concurrency error. Expected stream "${streamName}" at version ${expectedVersion} but is at version ${streamVersion}.`);
}
if (events.length > 1) {
delete metadata.commitVersion;
}
const commitId = this.length;
let commitVersion = 0;
const commitSize = events.length;
const committedAt = Date.now();
const commit = Object.assign({
commitId,
committedAt
}, metadata, {
streamName,
streamVersion,
events: []
});
const commitCallback = () => {
this.emit('commit', commit);
callback(commit);
};
for (let event of events) {
const eventMetadata = Object.assign({ commitId, committedAt, commitVersion, commitSize }, metadata, { streamVersion });
const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
commitVersion++;
streamVersion++;
commit.events.push(event);
this.storage.write(storedEvent, commitVersion !== events.length ? undefined : commitCallback);
}
}
/**
* @api
* @param {string} streamName The name of the stream to get the version for.
* @returns {number} The version that the given stream is at currently, or -1 if the stream does not exist.
*/
getStreamVersion(streamName) {
if (!(streamName in this.streams)) {
return -1;
}
return this.streams[streamName].index.length;
}
/**
* Get an event stream for the given stream name within the revision boundaries.
*
* @api
* @param {string} streamName The name of the stream to get.
* @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
* @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
* @returns {EventStream|boolean} The event stream or false if a stream with the name doesn't exist.
*/
getEventStream(streamName, minRevision = 1, maxRevision = -1) {
if (!(streamName in this.streams)) {
return false;
}
return new EventStream(streamName, this, minRevision, maxRevision);
}
/**
* Get a stream for all events within the revision boundaries.
* This is the same as `getEventStream('_all', ...)`.
*
* @api
* @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
* @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
* @returns {EventStream} The event stream.
*/
getAllEvents(minRevision = 1, maxRevision = -1) {
return this.getEventStream('_all', minRevision, maxRevision);
}
/**
* Create a virtual event stream from existing streams by joining them.
*
* @param {string} streamName The (transient) name of the joined stream.
* @param {Array<string>} streamNames An array of the stream names to join.
* @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
* @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
* @returns {EventStream} The joined event stream.
* @throws {Error} if any of the streams doesn't exist.
*/
fromStreams(streamName, streamNames, minRevision = 1, maxRevision = -1) {
assert(streamNames instanceof Array, 'Must specify an array of stream names.');
for (let stream of streamNames) {
assert(stream in this.streams, `Stream "${stream}" does not exist.`);
}
return new JoinEventStream(streamName, streamNames, this, minRevision, maxRevision);
}
/**
* Get a stream for a category of streams. This will effectively return a joined stream of all streams that start
* with the given `categoryName` followed by a dash.
* If you frequently use this for a category consisting of a lot of streams (e.g. `users`), consider creating a
* dedicated physical stream for the category:
*
* `eventstore.createEventStream('users', e => e.stream.startsWith('users-'))`
*
* @api
* @param {string} categoryName The name of the category to get a stream for. A category is a stream name prefix.
* @param {number} [minRevision] The 1-based minimum revision to include in the events (inclusive).
* @param {number} [maxRevision] The 1-based maximum revision to include in the events (inclusive).
* @returns {EventStream} The joined event stream for all streams of the given category.
* @throws {Error} If no stream for this category exists.
*/
getEventStreamForCategory(categoryName, minRevision = 1, maxRevision = -1) {
if (categoryName in this.streams) {
return this.getEventStream(categoryName, minRevision, maxRevision);
}
const categoryStreams = Object.keys(this.streams).filter(streamName => streamName.startsWith(categoryName + '-'));
if (categoryStreams.length === 0) {
throw new Error(`No streams for category '${categoryName}' exist.`);
}
return this.fromStreams(categoryName, categoryStreams, minRevision, maxRevision);
}
/**
* Create a new stream with the given matcher.
*
* @api
* @param {string} streamName The name of the stream to create.
* @param {object|function(event)} matcher A matcher object, denoting the properties that need to match on an event a function that takes the event and returns true if the event should be added.
* @returns {EventStream} The EventStream with all existing events matching the matcher.
* @throws {Error} If a stream with that name already exists.
* @throws {Error} If the stream could not be created.
*/
createEventStream(streamName, matcher) {
assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not create new stream on it.');
assert(!(streamName in this.streams), 'Can not recreate stream!');
const streamIndexName = 'stream-' + streamName;
const index = this.storage.ensureIndex(streamIndexName, matcher);
assert(index !== null, `Error creating stream index ${streamName}.`);
// deepcode ignore PrototypePollutionFunctionParams: streams is a Map
this.streams[streamName] = { index, matcher };
this.emit('stream-created', streamName);
return new EventStream(streamName, this);
}
/**
* Delete an event stream. Will do nothing if the stream with the name doesn't exist.
*
* Note that you can delete a write stream, but that will not delete the events written to it.
* Also, on next write, that stream will be rebuilt from all existing events, which might take some time.
*
* @api
* @param {string} streamName The name of the stream to delete.
* @returns void
*/
deleteEventStream(streamName) {
assert(!(this.storage instanceof Storage.ReadOnly), 'The storage was opened in read-only mode. Can not delete a stream on it.');
if (!(streamName in this.streams)) {
return;
}
this.streams[streamName].index.destroy();
delete this.streams[streamName];
this.emit('stream-deleted', streamName);
}
/**
* Get a durable consumer for the given stream that will keep receiving events from the last position.
*
* @param {string} streamName The name of the stream to consume.
* @param {string} identifier The unique identifying name of this consumer.
* @param {object} [initialState] The initial state of the consumer.
* @param {number} [since] The stream revision to start consuming from.
* @returns {Consumer} A durable consumer for the given stream.
*/
getConsumer(streamName, identifier, initialState = {}, since = 0) {
const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
consumer.streamName = streamName;
return consumer;
}
/**
* Scan the existing consumers on this EventStore and asynchronously return a list of their names.
* @param {function(error: Error, consumers: array)} callback A callback that will receive an error as first and the list of consumers as second argument.
*/
scanConsumers(callback) {
const consumersPath = path.join(this.storage.indexDirectory, 'consumers');
if (!fs.existsSync(consumersPath)) {
callback(null, []);
return;
}
fs.readdir(consumersPath, (err, files) => {
/* istanbul ignore if */
if (err) {
return callback(err, []);
}
let matches;
const regex = new RegExp(`^${this.storage.storageFile}\.([^.]*\..*)$`);
const consumers = [];
for (let file of files) {
if ((matches = file.match(regex)) !== null) {
consumers.push(matches[1]);
}
}
callback(null, consumers);
});
}
}
module.exports = EventStore;
module.exports.ExpectedVersion = ExpectedVersion;
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
module.exports.LOCK_THROW = Storage.LOCK_THROW;
module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM;