Skip to content

Commit

Permalink
Don't expose the version number (#47)
Browse files Browse the repository at this point in the history
Exposing the version number is problematic, since if we want to change
history by deleting or adding events in the past, we have to re-version
every single event that comes later in the stream.

Instead, we now use timestamps for versioning.

However, neither EF nor EF Core guarantees the ordering of entities that
are inserted in the same batch, which means we need a way to version
events that are inserted togehter to ensure that they are retreived in
the same order they were created. Therefore, we keep the Version property
on the persisted event types, but use it only internally inside the EF
providers.

The version number is also now grouped per timestamp, so that for each new
timestamp the version number restarts.
  • Loading branch information
Tomas Lycken committed Dec 5, 2017
1 parent 5b44821 commit 93a2faf
Show file tree
Hide file tree
Showing 23 changed files with 150 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public ExtraMetaTests(ExtraMetaEventFactoryFixture fixture)
.Options;
_dbContext = new EFCoreEventStoreContext<string, ExtraMetaLongStringPersistedEventMetadata>(options);

var stream1 = _fixture.EventFactory.Create("stream-1", 0, new object[] {
var stream1 = _fixture.EventFactory.Create("stream-1", new object[] {
new FooEvent { Foo = "Foo" },
new BarEvent { Bar = "Bar" },
new FooEvent { Foo = "Baz" }
})
.Select(_fixture.EventSerializer.Serialize);
var stream2 = _fixture.EventFactory.Create("stream-2", 0, new object[] {
var stream2 = _fixture.EventFactory.Create("stream-2", new object[] {
new FooEvent { Foo = "Boo" },
new BarEvent { Bar = "Far" }
})
Expand Down Expand Up @@ -89,9 +89,9 @@ public class ExtraMetaEventFactory : DefaultEventFactory<string, ExtraMetaString
{
private int _total;

protected override ExtraMetaStringEvent CreateSingle(string streamId, long version, object payload)
protected override ExtraMetaStringEvent CreateSingle(string streamId, object payload)
{
var @event = base.CreateSingle(streamId, version, payload);
var @event = base.CreateSingle(streamId, payload);
@event.ExtraMeta = $"{payload.GetType().Name}-{_total++}";
return @event;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using RdbmsEventStore.EFCore.Tests.Infrastructure;
using RdbmsEventStore.EFCore.Tests.TestData;
using Xunit;
Expand All @@ -10,23 +12,24 @@ public class QueryEventsTests : EventStoreTestBase<long, string, StringEvent, IE
{
public QueryEventsTests(EventStoreFixture<long, string, StringEvent, IEventMetadata<string>, LongStringPersistedEvent> fixture) : base(fixture)
{
var stream1 = _fixture.EventFactory.Create("stream-1", 0, new object[] {
new FooEvent { Foo = "Foo" },
new BarEvent { Bar = "Bar" },
new FooEvent { Foo = "Baz" }
})
.Select(_fixture.EventSerializer.Serialize);
var stream2 = _fixture.EventFactory.Create("stream-2", 0, new object[] {
new FooEvent { Foo = "Boo" },
new BarEvent { Bar = "Far" }
})
.Select(_fixture.EventSerializer.Serialize);
AddEvents("stream-1", new FooEvent { Foo = "Foo" });
Thread.Sleep(10); // to make timestamps differ between the first and the rest
AddEvents("stream-1", new BarEvent { Bar = "Bar" }, new FooEvent { Foo = "Baz" });
AddEvents("stream-2", new FooEvent { Foo = "Boo" });
Thread.Sleep(10);
AddEvents("stream-2", new BarEvent { Bar = "Far" });

_dbContext.Events.AddRange(stream1);
_dbContext.Events.AddRange(stream2);
_dbContext.SaveChanges();
}

private void AddEvents(string streamId, params object[] payloads)
{
var events = _fixture.EventFactory.Create(streamId, payloads)
.Select(_fixture.EventSerializer.Serialize)
.VersionedPerTimestamp<LongStringPersistedEvent, string>();
_dbContext.Events.AddRange(events);
}

[Theory]
[InlineData("stream-1", 3)]
[InlineData("stream-2", 2)]
Expand All @@ -42,8 +45,13 @@ public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expec
[InlineData("stream-2", 1)]
public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCount)
{
var firstEventTimestamp = await _dbContext.Events
.Where(e => e.StreamId == streamId)
.Select(e => e.Timestamp)
.FirstAsync();

var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(streamId, es => es.Where(e => e.Version > 1));
var events = await store.Events(streamId, es => es.Where(e => e.Timestamp > firstEventTimestamp));
Assert.Equal(expectedCount, events.Count());
}

Expand All @@ -58,9 +66,13 @@ public async Task ReturnsAllEvents()
[Fact]
public async Task ReturnsAllEventsAccordingToQuery()
{
var firstEventTimestamp = await _dbContext.Events
.Select(e => e.Timestamp)
.FirstAsync();

var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(es => es.Where(e => e.Version > 1));
Assert.Equal(3, events.Count());
var events = await store.Events(es => es.Where(e => e.Timestamp > firstEventTimestamp));
Assert.Equal(4, events.Count());
}
}
}
21 changes: 10 additions & 11 deletions src/RdbmsEventStore.EFCore.Tests/EventStoreTests/WriteEventTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public WriteEventTests(EventStoreFixture<Guid, Guid, GuidEvent, IEventMetadata<G
public async Task CommittingEventStoresEventInContext()
{
var store = _fixture.BuildEventStore(_dbContext);
await store.Append(Guid.NewGuid(), 0, new[] { new FooEvent { Foo = "Bar" } });
await store.Append(Guid.NewGuid(), null, new[] { new FooEvent { Foo = "Bar" } });
Assert.Equal(1, await _dbContext.Events.CountAsync());
}

Expand All @@ -28,10 +28,10 @@ public async Task CommittingWithOutOfSyncDataThrowsConflictException()
{
var store = _fixture.BuildEventStore(_dbContext);
var stream = Guid.NewGuid();
_dbContext.Events.AddRange(_fixture.EventFactory.Create(stream, 0, new[] { new FooEvent { Foo = "Bar" } }).Select(_fixture.EventSerializer.Serialize));
_dbContext.Events.AddRange(_fixture.EventFactory.Create(stream, new[] { new FooEvent { Foo = "Bar" } }).Select(_fixture.EventSerializer.Serialize));
await _dbContext.SaveChangesAsync();

await Assert.ThrowsAsync<ConflictException>(() => store.Append(stream, 0, new[] { new FooEvent { Foo = "Qux" } }));
await Assert.ThrowsAsync<ConflictException>(() => store.Append(stream, null, new[] { new FooEvent { Foo = "Qux" } }));
}

[Fact]
Expand All @@ -44,7 +44,7 @@ public async Task CommittingNoEventsExitsEarly() {
var store = _fixture.BuildEventStore(context.Object);

try {
await store.Append(stream, 0, new object[] { });
await store.Append(stream, null, new object[] { });
} catch (NotImplementedException) {
// Thrown by the mock DbSet if we try to query for existing events
// This indicates that we didn't exit early
Expand All @@ -61,7 +61,7 @@ public async Task CommittingMultipleEventsStoresAllEventsInContext()
var store = _fixture.BuildEventStore(_dbContext);

var events = new[] { new FooEvent { Foo = "Foo" }, new FooEvent { Foo = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);
await store.Append(Guid.NewGuid(), null, events);

Assert.Equal(2, await _dbContext.Events.CountAsync());
}
Expand All @@ -74,7 +74,7 @@ public async Task CommittingMultipleEventsStoresEventsInOrder()
var store = _fixture.BuildEventStore(_dbContext);

var events = new object[] { new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);
await store.Append(Guid.NewGuid(), null, events);

Assert.Collection(await _dbContext.Events.OrderBy(e => e.Version).ToListAsync(),
foo => Assert.Equal(typeof(FooEvent), _fixture.EventRegistry.TypeFor(foo.Type)),
Expand All @@ -88,12 +88,11 @@ public async Task CommittingMultipleEventsIncrementsVersionForEachEvent()

var store = _fixture.BuildEventStore(_dbContext);
var events = new object[] { new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);
await store.Append(Guid.NewGuid(), null, events);

var storedEvents = await _dbContext.Events.OrderBy(e => e.Timestamp).ToListAsync();
Assert.Collection(storedEvents,
foo => Assert.Equal(1, foo.Version),
bar => Assert.Equal(2, bar.Version));
Assert.Collection(await _dbContext.Events.OrderBy(e => e.Timestamp).ToListAsync(),
foo => Assert.Equal(typeof(FooEvent), _fixture.EventRegistry.TypeFor(foo.Type)),
bar => Assert.Equal(typeof(BarEvent), _fixture.EventRegistry.TypeFor(bar.Type)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task CanCommitEventsToStoreWithDefaultImplementations()
{
var store = _fixture.BuildEventStore(_dbContext);

await store.Append(1, 0, new[] { new FooEvent { Foo = "Bar" } });
await store.Append(1, null, new[] { new FooEvent { Foo = "Bar" } });
}

[Fact]
Expand Down
17 changes: 12 additions & 5 deletions src/RdbmsEventStore.EFCore/EFCoreEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public async Task<IEnumerable<TEvent>> Events(Func<IQueryable<TEventMetadata>, I
return events;
}

public async Task Append(TStreamId streamId, long versionBefore, IEnumerable<object> payloads)
public async Task Append(TStreamId streamId, DateTimeOffset? versionBefore, IEnumerable<object> payloads)
{
if (!payloads.Any())
{
Expand All @@ -55,16 +55,23 @@ public async Task Append(TStreamId streamId, long versionBefore, IEnumerable<obj
{
var highestVersionNumber = await _dbContext.Events
.Where(e => e.StreamId.Equals(streamId))
.Select(e => e.Version)
.DefaultIfEmpty(0)
.MaxAsync();
.Select(e => e.Timestamp)
.DefaultIfEmpty(DateTimeOffset.MinValue)
.MaxAsync() as DateTimeOffset?;

if (highestVersionNumber == DateTimeOffset.MinValue)
{
highestVersionNumber = null;
}

if (highestVersionNumber != versionBefore)
{
throw new ConflictException(streamId, versionBefore, highestVersionNumber, payloads);
}

var events = _eventFactory.Create(streamId, versionBefore, payloads).Select(_eventSerializer.Serialize);
var events = _eventFactory.Create(streamId, payloads)
.Select(_eventSerializer.Serialize)
.VersionedPerTimestamp<TPersistedEvent, TStreamId>();
_dbContext.Events.AddRange(events);
await _dbContext.SaveChangesAsync();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Threading.Tasks;
using RdbmsEventStore.EntityFramework.Tests.Infrastructure;
using RdbmsEventStore.EntityFramework.Tests.TestData;
Expand All @@ -21,13 +22,13 @@ public ExtraMetaTests(ExtraMetaEventFactoryFixture fixture, AssemblyInitializerF
_fixture = fixture;
_dbContext = new EntityFrameworkEventStoreContext<ExtraMetaLongStringPersistedEventMetadata>();

var stream1 = _fixture.EventFactory.Create("stream-1", 0, new object[] {
var stream1 = _fixture.EventFactory.Create("stream-1", new object[] {
new FooEvent { Foo = "Foo" },
new BarEvent { Bar = "Bar" },
new FooEvent { Foo = "Baz" }
})
.Select(_fixture.EventSerializer.Serialize);
var stream2 = _fixture.EventFactory.Create("stream-2", 0, new object[] {
var stream2 = _fixture.EventFactory.Create("stream-2", new object[] {
new FooEvent { Foo = "Boo" },
new BarEvent { Bar = "Far" }
})
Expand Down Expand Up @@ -87,9 +88,9 @@ public class ExtraMetaEventFactory : DefaultEventFactory<string, ExtraMetaString
{
private int _total;

protected override ExtraMetaStringEvent CreateSingle(string streamId, long version, object payload)
protected override ExtraMetaStringEvent CreateSingle(string streamId, object payload)
{
var @event = base.CreateSingle(streamId, version, payload);
var @event = base.CreateSingle(streamId, payload);
@event.ExtraMeta = $"{payload.GetType().Name}-{_total++}";
return @event;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Linq;
using System.Data.Entity;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using RdbmsEventStore.EntityFramework.Tests.Infrastructure;
using RdbmsEventStore.EntityFramework.Tests.TestData;
Expand All @@ -10,23 +12,24 @@ public class QueryEventsTests : EventStoreTestBase<long, string, StringEvent, IE
{
public QueryEventsTests(EventStoreFixture<long, string, StringEvent, IEventMetadata<string>, LongStringPersistedEvent> fixture, AssemblyInitializerFixture initializer) : base(fixture, initializer)
{
var stream1 = _fixture.EventFactory.Create("stream-1", 0, new object[] {
new FooEvent { Foo = "Foo" },
new BarEvent { Bar = "Bar" },
new FooEvent { Foo = "Baz" }
})
.Select(_fixture.EventSerializer.Serialize);
var stream2 = _fixture.EventFactory.Create("stream-2", 0, new object[] {
new FooEvent { Foo = "Boo" },
new BarEvent { Bar = "Far" }
})
.Select(_fixture.EventSerializer.Serialize);
AddEvents("stream-1", new FooEvent { Foo = "Foo" });
Thread.Sleep(10); // to make timestamps differ between the first and the rest
AddEvents("stream-1", new BarEvent { Bar = "Bar" }, new FooEvent { Foo = "Baz" });
AddEvents("stream-2", new FooEvent { Foo = "Boo" });
Thread.Sleep(10);
AddEvents("stream-2", new BarEvent { Bar = "Far" });

_dbContext.Events.AddRange(stream1);
_dbContext.Events.AddRange(stream2);
_dbContext.SaveChanges();
}

private void AddEvents(string streamId, params object[] payloads)
{
var events = _fixture.EventFactory.Create(streamId, payloads)
.Select(_fixture.EventSerializer.Serialize)
.VersionedPerTimestamp<LongStringPersistedEvent, string>();
_dbContext.Events.AddRange(events);
}

[Theory]
[InlineData("stream-1", 3)]
[InlineData("stream-2", 2)]
Expand All @@ -42,8 +45,13 @@ public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expec
[InlineData("stream-2", 1)]
public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCount)
{
var firstEventTimestamp = await _dbContext.Events
.Where(e => e.StreamId == streamId)
.Select(e => e.Timestamp)
.FirstAsync();

var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(streamId, es => es.Where(e => e.Version > 1));
var events = await store.Events(streamId, es => es.Where(e => e.Timestamp > firstEventTimestamp));
Assert.Equal(expectedCount, events.Count());
}

Expand All @@ -58,9 +66,13 @@ public async Task ReturnsAllEvents()
[Fact]
public async Task ReturnsAllEventsAccordingToQuery()
{
var firstEventTimestamp = await _dbContext.Events
.Select(e => e.Timestamp)
.FirstAsync();

var store = _fixture.BuildEventStore(_dbContext) as IEventStore<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(es => es.Where(e => e.Version > 1));
Assert.Equal(3, events.Count());
var events = await store.Events(es => es.Where(e => e.Timestamp > firstEventTimestamp));
Assert.Equal(4, events.Count());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public WriteEventTests(EventStoreFixture<Guid, Guid, GuidEvent, IEventMetadata<G
public async Task CommittingEventStoresEventInContext()
{
var store = _fixture.BuildEventStore(_dbContext);
await store.Append(Guid.NewGuid(), 0, new[] { new FooEvent { Foo = "Bar" } });
await store.Append(Guid.NewGuid(), null, new[] { new FooEvent { Foo = "Bar" } });
Assert.Equal(1, await _dbContext.Events.CountAsync());
}

Expand All @@ -29,10 +29,10 @@ public async Task CommittingWithOutOfSyncDataThrowsConflictException()
{
var store = _fixture.BuildEventStore(_dbContext);
var stream = Guid.NewGuid();
_dbContext.Events.AddRange(_fixture.EventFactory.Create(stream, 0, new[] { new FooEvent { Foo = "Bar" } }).Select(_fixture.EventSerializer.Serialize));
_dbContext.Events.AddRange(_fixture.EventFactory.Create(stream, new[] { new FooEvent { Foo = "Bar" } }).Select(_fixture.EventSerializer.Serialize));
await _dbContext.SaveChangesAsync();

await Assert.ThrowsAsync<ConflictException>(() => store.Append(stream, 0, new[] { new FooEvent { Foo = "Qux" } }));
await Assert.ThrowsAsync<ConflictException>(() => store.Append(stream, null, new[] { new FooEvent { Foo = "Qux" } }));
}

[Fact]
Expand All @@ -45,7 +45,7 @@ public async Task CommittingNoEventsExitsEarly() {
var store = _fixture.BuildEventStore(context.Object);

try {
await store.Append(stream, 0, new object[] { });
await store.Append(stream, null, new object[] { });
} catch (NotImplementedException) {
// Thrown by the mock DbSet if we try to query for existing events
// This indicates that we didn't exit early
Expand All @@ -62,7 +62,7 @@ public async Task CommittingMultipleEventsStoresAllEventsInContext()
var store = _fixture.BuildEventStore(_dbContext);

var events = new[] { new FooEvent { Foo = "Foo" }, new FooEvent { Foo = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);
await store.Append(Guid.NewGuid(), null, events);

Assert.Equal(2, await _dbContext.Events.CountAsync());
}
Expand All @@ -75,26 +75,11 @@ public async Task CommittingMultipleEventsStoresEventsInOrder()
var store = _fixture.BuildEventStore(_dbContext);

var events = new object[] { new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);
await store.Append(Guid.NewGuid(), null, events);

Assert.Collection(await _dbContext.Events.OrderBy(e => e.Version).ToListAsync(),
Assert.Collection(await _dbContext.Events.OrderBy(e => e.Timestamp).ToListAsync(),
foo => Assert.Equal(typeof(FooEvent), _fixture.EventRegistry.TypeFor(foo.Type)),
bar => Assert.Equal(typeof(BarEvent), _fixture.EventRegistry.TypeFor(bar.Type)));
}

[Fact]
public async Task CommittingMultipleEventsIncrementsVersionForEachEvent()
{
Assert.Empty(await _dbContext.Events.ToListAsync());

var store = _fixture.BuildEventStore(_dbContext);
var events = new object[] { new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);

var storedEvents = await _dbContext.Events.OrderBy(e => e.Timestamp).ToListAsync();
Assert.Collection(storedEvents,
foo => Assert.Equal(1, foo.Version),
bar => Assert.Equal(2, bar.Version));
}
}
}
Loading

0 comments on commit 93a2faf

Please sign in to comment.