-
Notifications
You must be signed in to change notification settings - Fork 2
/
EntityFrameworkEventStore.cs
55 lines (48 loc) · 1.97 KB
/
EntityFrameworkEventStore.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
using System;
using System.Collections.Generic;
using System.Data.Entity;
using System.Linq;
using System.Threading.Tasks;
namespace RdbmsEventStore.EntityFramework
{
public class EntityFrameworkEventStore<TId, TContext, TEvent> : IEventStore<TId, TEvent>
where TId : IEquatable<TId>
where TContext : DbContext, IEventDbContext<TEvent>
where TEvent : Event<TId>, IEvent<TId>, new()
{
private readonly TContext context;
private readonly IEventFactory<TId, TEvent> _eventFactory;
private readonly IWriteLock _writeLock;
public EntityFrameworkEventStore(TContext context, IEventFactory<TId, TEvent> eventFactory, IWriteLock writeLock)
{
this.context = context;
_eventFactory = eventFactory;
_writeLock = writeLock;
}
public Task<IEnumerable<TEvent>> Events(TId streamId) => Events(streamId, query => query);
public async Task<IEnumerable<TEvent>> Events(TId streamId, Func<IQueryable<TEvent>, IQueryable<TEvent>> query)
=> await context.Events
.Where(e => e.StreamId.Equals(streamId))
.Apply(query)
.AsNoTracking()
.ToListAsync();
public async Task Commit(TId streamId, long versionBefore, params object[] payloads)
{
using (await _writeLock.Aquire())
{
var highestVersionNumber = await context.Events
.Where(e => e.StreamId.Equals(streamId))
.Select(e => e.Version)
.DefaultIfEmpty(0L)
.MaxAsync();
if (highestVersionNumber != versionBefore)
{
// TODO: throw conflict exception
}
var events = _eventFactory.Create(streamId, versionBefore, payloads);
context.Events.AddRange(events);
await context.SaveChangesAsync();
}
}
}
}