Skip to content

Commit

Permalink
Simplify event stream (#36)
Browse files Browse the repository at this point in the history
* Simplify event store by re-using overloads

* Move QueryableExtensions to System.Linq

* Fix naming of private field

* Implement derivable overloads with extension methods

* Rename IEventStream back to IEventStore
  • Loading branch information
Tomas Lycken authored Nov 1, 2017
1 parent 432a03b commit 81238d3
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 59 deletions.
2 changes: 1 addition & 1 deletion GitVersion.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mode: ContinuousDelivery
next-version: 0.4.1
next-version: 0.5.0
branches:
master:
tag: beta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ExtraMetaTests(ExtraMetaEventFactoryFixture fixture, AssemblyInitializerF
[InlineData("stream-2", 2)]
public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId);
Assert.Equal(expectedCount, events.Count());
}
Expand All @@ -53,7 +53,7 @@ public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expec
[InlineData("stream-2", 1)]
public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId, es => es.Where(e => e.ExtraMeta.StartsWith("Foo")));
Assert.Equal(expectedCount, events.Count());
}
Expand All @@ -63,7 +63,7 @@ public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCo
[InlineData("stream-2", 1)]
public async Task ReturnsEventsWithMetadata(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId, es => es.Where(e => e.ExtraMeta.StartsWith("Foo")));
Assert.All(events, @event => Assert.StartsWith("Foo", @event.ExtraMeta));
}
Expand All @@ -73,7 +73,7 @@ public async Task ReturnsEventsWithMetadata(string streamId, long expectedCount)
[InlineData("stream-2", 1)]
public async Task CanQueryByExtraMetadata(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId, es => es.Where(e => e.ExtraMeta.StartsWith("Foo")));
Assert.Equal(expectedCount, events.Count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public QueryEventsTests(EventStoreFixture<long, string, StringEvent, IEventMetad
[InlineData("stream-2", 2)]
public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, StringEvent, IEventMetadata<string>>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(streamId);
Assert.Equal(expectedCount, events.Count());
}
Expand All @@ -42,23 +42,23 @@ public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expec
[InlineData("stream-2", 1)]
public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, StringEvent, IEventMetadata<string>>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(streamId, es => es.Where(e => e.Version > 1));
Assert.Equal(expectedCount, events.Count());
}

[Fact]
public async Task ReturnsAllEvents()
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, StringEvent, IEventMetadata<string>>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events();
Assert.Equal(5, events.Count());
}

[Fact]
public async Task ReturnsAllEventsAccordingToQuery()
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, StringEvent, IEventMetadata<string>>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(es => es.Where(e => e.Version > 1));
Assert.Equal(3, events.Count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public async Task CanReadEventsFromStoreWithNonDefaultImplementations()
});
await _dbContext.SaveChangesAsync();

var store = _fixture.BuildEventStore(_dbContext) as IEventStream<long, NonDefaultEvent, IEventMetadata<long>>;
var store = _fixture.BuildEventStore(_dbContext) as IEventStore<long, NonDefaultEvent, IEventMetadata<long>>;

var events = await store.Events(1);

Expand Down
37 changes: 7 additions & 30 deletions src/RdbmsEventStore.EntityFramework/EntityFrameworkEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,30 @@

namespace RdbmsEventStore.EntityFramework
{
public class EntityFrameworkEventStore<TId, TStreamId, TContext, TEvent, TEventMetadata, TPersistedEvent> : IEventStream<TStreamId, TEvent, TEventMetadata>
public class EntityFrameworkEventStore<TId, TStreamId, TContext, TEvent, TEventMetadata, TPersistedEvent> : IEventStore<TStreamId, TEvent, TEventMetadata>
where TId : IEquatable<TId>
where TStreamId : IEquatable<TStreamId>
where TContext : DbContext, IEventDbContext<TPersistedEvent>
where TEvent : class, TEventMetadata, IMutableEvent<TStreamId>, new()
where TPersistedEvent : class, IPersistedEvent<TStreamId>, TEventMetadata, new()
where TEventMetadata : class, IEventMetadata<TStreamId>
{
private readonly TContext context;
private readonly TContext _context;
private readonly IEventFactory<TStreamId, TEvent> _eventFactory;
private readonly IWriteLock<TStreamId> _writeLock;
private readonly IEventSerializer<TEvent, TPersistedEvent> _serializer;

public EntityFrameworkEventStore(TContext context, IEventFactory<TStreamId, TEvent> eventFactory, IWriteLock<TStreamId> writeLock, IEventSerializer<TEvent, TPersistedEvent> serializer)
{
this.context = context;
_context = context;
_eventFactory = eventFactory;
_writeLock = writeLock;
_serializer = serializer;
}

public Task<IEnumerable<TEvent>> Events() => Events(events => events);

public async Task<IEnumerable<TEvent>> Events(Func<IQueryable<TEventMetadata>, IQueryable<TEventMetadata>> query)
{
var storedEvents = await context.Events
var storedEvents = await _context.Events
.AsNoTracking()
.Apply(query)
.OrderBy(e => e.Timestamp)
Expand All @@ -45,32 +43,11 @@ public async Task<IEnumerable<TEvent>> Events(Func<IQueryable<TEventMetadata>, I
return events;
}

public Task<IEnumerable<TEvent>> Events(TStreamId streamId) => Events(streamId, events => events);

public async Task<IEnumerable<TEvent>> Events(TStreamId streamId, Func<IQueryable<TEventMetadata>, IQueryable<TEventMetadata>> query)
{
var storedEvents = await context.Events
.Where(e => e.StreamId.Equals(streamId))
.AsNoTracking()
.Apply(query)
.OrderBy(e => e.Timestamp)
.ToListAsync();

var events = storedEvents
.Cast<TPersistedEvent>()
.Select(_serializer.Deserialize);

return events;
}

public Task Append(TStreamId streamId, long versionBefore, object payload)
=> Append(streamId, versionBefore, new[] { payload });

public async Task Append(TStreamId streamId, long versionBefore, IEnumerable<object> payloads)
{
using (await _writeLock.Aquire(streamId))
{
var highestVersionNumber = await context.Events
var highestVersionNumber = await _context.Events
.Where(e => e.StreamId.Equals(streamId))
.Select(e => e.Version)
.DefaultIfEmpty(0)
Expand All @@ -82,8 +59,8 @@ public async Task Append(TStreamId streamId, long versionBefore, IEnumerable<obj
}

var events = _eventFactory.Create(streamId, versionBefore, payloads).Select(_serializer.Serialize);
context.Events.AddRange(events);
await context.SaveChangesAsync();
_context.Events.AddRange(events);
await _context.SaveChangesAsync();
}
}
}
Expand Down
10 changes: 0 additions & 10 deletions src/RdbmsEventStore.EntityFramework/QueryableExtensions.cs

This file was deleted.

34 changes: 34 additions & 0 deletions src/RdbmsEventStore/EventStoreExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace RdbmsEventStore
{
public static class EventStoreExtensions
{
public static Task<IEnumerable<TEvent>> Events<TStreamId, TEvent, TEventMetadata>(this IEventStore<TStreamId, TEvent, TEventMetadata> store)
where TStreamId : IEquatable<TStreamId>
where TEvent : class, TEventMetadata, IEvent<TStreamId>
where TEventMetadata : class, IEventMetadata<TStreamId>
=> store.Events(events => events);

public static Task<IEnumerable<TEvent>> Events<TStreamId, TEvent, TEventMetadata>(this IEventStore<TStreamId, TEvent, TEventMetadata> store, TStreamId streamId)
where TStreamId : IEquatable<TStreamId>
where TEvent : class, TEventMetadata, IEvent<TStreamId>
where TEventMetadata : class, IEventMetadata<TStreamId>
=> store.Events(streamId, events => events);

public static Task<IEnumerable<TEvent>> Events<TStreamId, TEvent, TEventMetadata>(this IEventStore<TStreamId, TEvent, TEventMetadata> store, TStreamId streamId, Func<IQueryable<TEventMetadata>, IQueryable<TEventMetadata>> query)
where TStreamId : IEquatable<TStreamId>
where TEvent : class, TEventMetadata, IEvent<TStreamId>
where TEventMetadata : class, IEventMetadata<TStreamId>
=> store.Events(events => events.Where(e => e.StreamId.Equals(streamId)).Apply(query));

public static Task Append<TStreamId, TEvent, TEventMetadata>(this IEventStore<TStreamId, TEvent, TEventMetadata> store, TStreamId streamId, long versionBefore, object payload)
where TStreamId : IEquatable<TStreamId>
where TEvent : class, TEventMetadata, IEvent<TStreamId>
where TEventMetadata : class, IEventMetadata<TStreamId>
=> store.Append(streamId, versionBefore, new[] { payload });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,13 @@

namespace RdbmsEventStore
{
public interface IEventStream<in TStreamId, TEvent, TEventMetadata>
public interface IEventStore<in TStreamId, TEvent, TEventMetadata>
where TStreamId : IEquatable<TStreamId>
where TEvent : IEvent<TStreamId>, TEventMetadata
where TEventMetadata : IEventMetadata<TStreamId>
{
Task<IEnumerable<TEvent>> Events();

Task<IEnumerable<TEvent>> Events(Func<IQueryable<TEventMetadata>, IQueryable<TEventMetadata>> query);

Task<IEnumerable<TEvent>> Events(TStreamId streamId);

Task<IEnumerable<TEvent>> Events(TStreamId streamId, Func<IQueryable<TEventMetadata>, IQueryable<TEventMetadata>> query);

Task Append(TStreamId streamId, long versionBefore, object payload);

Task Append(TStreamId streamId, long versionBefore, IEnumerable<object> payloads);
}
}
8 changes: 8 additions & 0 deletions src/RdbmsEventStore/QueryableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// ReSharper disable once CheckNamespace
namespace System.Linq
{
public static class QueryableExtensions
{
public static IQueryable<T> Apply<T>(this IQueryable<T> source, Func<IQueryable<T>, IQueryable<T>> projection) => projection(source);
}
}

0 comments on commit 81238d3

Please sign in to comment.