Skip to content

Commit

Permalink
Separate event from storage (#26)
Browse files Browse the repository at this point in the history
* Separate the concepts of an event, and the storage type for it

* Rename Commit->Append

* Increment version; breaking changes
  • Loading branch information
Tomas Lycken committed Oct 25, 2017
1 parent ec63c4a commit c8feaac
Show file tree
Hide file tree
Showing 36 changed files with 487 additions and 284 deletions.
2 changes: 1 addition & 1 deletion GitVersion.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mode: ContinuousDelivery
next-version: 0.3.0
next-version: 0.4.0
branches:
master:
tag: beta
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System.Linq;
using System.Threading.Tasks;
using RdbmsEventStore.EntityFramework.Tests.Infrastructure;
using RdbmsEventStore.EntityFramework.Tests.TestData;
using RdbmsEventStore.EventRegistry;
using RdbmsEventStore.Serialization;
using Xunit;

namespace RdbmsEventStore.EntityFramework.Tests.EventStoreTests
{
[Collection(nameof(InMemoryDatabaseCollection))]
public class ExtraMetaTests : IClassFixture<ExtraMetaEventFactoryFixture>
{
private readonly ExtraMetaEventFactoryFixture _fixture;
private readonly EventStoreContext<ExtraMetaLongStringPersistedEventMetadata> _dbContext;

// ReSharper disable once UnusedParameter.Local
public ExtraMetaTests(ExtraMetaEventFactoryFixture fixture, AssemblyInitializerFixture _)
{
EffortProviderFactory.ResetDb();
_fixture = fixture;
_dbContext = new EventStoreContext<ExtraMetaLongStringPersistedEventMetadata>();

var stream1 = _fixture.EventFactory.Create("stream-1", 0, new object[] {
new FooEvent { Foo = "Foo" },
new BarEvent { Bar = "Bar" },
new FooEvent { Foo = "Baz" }
})
.Select(_fixture.EventSerializer.Serialize);
var stream2 = _fixture.EventFactory.Create("stream-2", 0, new object[] {
new FooEvent { Foo = "Boo" },
new BarEvent { Bar = "Far" }
})
.Select(_fixture.EventSerializer.Serialize);

_dbContext.Events.AddRange(stream1);
_dbContext.Events.AddRange(stream2);
_dbContext.SaveChanges();
}

[Theory]
[InlineData("stream-1", 3)]
[InlineData("stream-2", 2)]
public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId);
Assert.Equal(expectedCount, events.Count());
}

[Theory]
[InlineData("stream-1", 2)]
[InlineData("stream-2", 1)]
public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId, es => es.Where(e => e.ExtraMeta.StartsWith("Foo")));
Assert.Equal(expectedCount, events.Count());
}

[Theory]
[InlineData("stream-1", 2)]
[InlineData("stream-2", 1)]
public async Task ReturnsEventsWithMetadata(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId, es => es.Where(e => e.ExtraMeta.StartsWith("Foo")));
Assert.All(events, @event => Assert.StartsWith("Foo", @event.ExtraMeta));
}

[Theory]
[InlineData("stream-1", 2)]
[InlineData("stream-2", 1)]
public async Task CanQueryByExtraMetadata(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, ExtraMetaStringEvent, IExtraMeta>;
var events = await store.Events(streamId, es => es.Where(e => e.ExtraMeta.StartsWith("Foo")));
Assert.Equal(expectedCount, events.Count());
}
}

public class ExtraMetaEventFactory : DefaultEventFactory<string, ExtraMetaStringEvent>
{
private int _total;

protected override ExtraMetaStringEvent CreateSingle(string streamId, long version, object payload)
{
var @event = base.CreateSingle(streamId, version, payload);
@event.ExtraMeta = $"{payload.GetType().Name}-{_total++}";
return @event;
}
}

public class ExtraMetaEventSerializer : DefaultEventSerializer<string, ExtraMetaStringEvent, ExtraMetaLongStringPersistedEventMetadata>
{
public ExtraMetaEventSerializer(IEventRegistry registry) : base(registry)
{
}

public override ExtraMetaLongStringPersistedEventMetadata Serialize(ExtraMetaStringEvent @event)
{
var serialized = base.Serialize(@event);
serialized.ExtraMeta = @event.ExtraMeta;
return serialized;
}

public override ExtraMetaStringEvent Deserialize(ExtraMetaLongStringPersistedEventMetadata @event)
{
var deserialized = base.Deserialize(@event);
deserialized.ExtraMeta = @event.ExtraMeta;
return deserialized;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,33 @@

namespace RdbmsEventStore.EntityFramework.Tests.EventStoreTests
{
public class QueryEventsTests : EventStoreTestBase<long, string, LongStringEvent>
public class QueryEventsTests : EventStoreTestBase<long, string, StringEvent, IEventMetadata<string>, LongStringPersistedEvent>
{
public QueryEventsTests(EventStoreFixture<long, string, LongStringEvent> fixture, AssemblyInitializerFixture initializer) : base(fixture, initializer)
public QueryEventsTests(EventStoreFixture<long, string, StringEvent, IEventMetadata<string>, LongStringPersistedEvent> fixture, AssemblyInitializerFixture initializer) : base(fixture, initializer)
{
_dbContext.Events.AddRange(_fixture.EventFactory.Create("stream-1", 0,
new FooEvent{Foo = "Foo"},
new BarEvent{Bar = "Bar"},
new FooEvent{Foo = "Baz"}));
_dbContext.Events.AddRange(_fixture.EventFactory.Create("stream-2", 0,
new FooEvent {Foo = "Boo"},
new BarEvent {Bar = "Far"}));
var stream1 = _fixture.EventFactory.Create("stream-1", 0, new object[] {
new FooEvent { Foo = "Foo" },
new BarEvent { Bar = "Bar" },
new FooEvent { Foo = "Baz" }
})
.Select(_fixture.EventSerializer.Serialize);
var stream2 = _fixture.EventFactory.Create("stream-2", 0, new object[] {
new FooEvent { Foo = "Boo" },
new BarEvent { Bar = "Far" }
})
.Select(_fixture.EventSerializer.Serialize);

_dbContext.Events.AddRange(stream1);
_dbContext.Events.AddRange(stream2);
_dbContext.SaveChanges();
}

[Theory]
[InlineData("stream-1", 3)]
[InlineData("stream-2", 2)]
public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext);
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(streamId);
Assert.Equal(expectedCount, events.Count());
}
Expand All @@ -35,10 +42,9 @@ public async Task ReturnsEventsFromCorrectStreamOnly(string streamId, long expec
[InlineData("stream-2", 1)]
public async Task ReturnsEventsAccordingToQuery(string streamId, long expectedCount)
{
var store = _fixture.BuildEventStore(_dbContext);
var events = await store.Events(streamId, es => es.Where(e => e.Type == "FooEvent"));
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<string, StringEvent, IEventMetadata<string>>;
var events = await store.Events(streamId, es => es.Where(e => e.Version > 1));
Assert.Equal(expectedCount, events.Count());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@

namespace RdbmsEventStore.EntityFramework.Tests.EventStoreTests
{
public class WriteEventTests : EventStoreTestBase<Guid, Guid, GuidGuidEvent>
public class WriteEventTests : EventStoreTestBase<Guid, Guid, GuidEvent, IEventMetadata<Guid>, GuidGuidPersistedEvent>
{
public WriteEventTests(EventStoreFixture<Guid, Guid, GuidGuidEvent> fixture, AssemblyInitializerFixture initializer) : base(fixture, initializer)
public WriteEventTests(EventStoreFixture<Guid, Guid, GuidEvent, IEventMetadata<Guid>, GuidGuidPersistedEvent> fixture, AssemblyInitializerFixture initializer) : base(fixture, initializer)
{
}

[Fact]
public async Task CommittingEventStoresEventInContext()
{
var store = _fixture.BuildEventStore(_dbContext);
await store.Commit(Guid.NewGuid(), 0, new FooEvent { Foo = "Bar" });
await store.Append(Guid.NewGuid(), 0, new[] { new FooEvent { Foo = "Bar" } });
Assert.Equal(1, await _dbContext.Events.CountAsync());
}

Expand All @@ -28,10 +28,10 @@ public async Task CommittingWithOutOfSyncDataThrowsConflictException()
{
var store = _fixture.BuildEventStore(_dbContext);
var stream = Guid.NewGuid();
_dbContext.Events.AddRange(_fixture.EventFactory.Create(stream, 0, new FooEvent { Foo = "Bar" }));
_dbContext.Events.AddRange(_fixture.EventFactory.Create(stream, 0, new[] { new FooEvent { Foo = "Bar" } }).Select(_fixture.EventSerializer.Serialize));
await _dbContext.SaveChangesAsync();

await Assert.ThrowsAsync<ConflictException>(() => store.Commit(stream, 0, new FooEvent { Foo = "Qux" }));
await Assert.ThrowsAsync<ConflictException>(() => store.Append(stream, 0, new[] { new FooEvent { Foo = "Qux" } }));
}

[Fact]
Expand All @@ -41,7 +41,8 @@ public async Task CommittingMultipleEventsStoresAllEventsInContext()

var store = _fixture.BuildEventStore(_dbContext);

await store.Commit(Guid.NewGuid(), 0, new FooEvent { Foo = "Foo" }, new FooEvent { Foo = "Bar" });
var events = new[] { new FooEvent { Foo = "Foo" }, new FooEvent { Foo = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);

Assert.Equal(2, await _dbContext.Events.CountAsync());
}
Expand All @@ -53,7 +54,8 @@ public async Task CommittingMultipleEventsStoresEventsInOrder()

var store = _fixture.BuildEventStore(_dbContext);

await store.Commit(Guid.NewGuid(), 0, new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" });
var events = new object[] { new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);

Assert.Collection(await _dbContext.Events.OrderBy(e => e.Version).ToListAsync(),
foo => Assert.Equal(typeof(FooEvent), _fixture.EventRegistry.TypeFor(foo.Type)),
Expand All @@ -66,11 +68,11 @@ public async Task CommittingMultipleEventsIncrementsVersionForEachEvent()
Assert.Empty(await _dbContext.Events.ToListAsync());

var store = _fixture.BuildEventStore(_dbContext);
var events = new object[] { new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" } };
await store.Append(Guid.NewGuid(), 0, events);

await store.Commit(Guid.NewGuid(), 0, new FooEvent { Foo = "Foo" }, new BarEvent { Bar = "Bar" });

var events = await _dbContext.Events.OrderBy(e => e.Timestamp).ToListAsync();
Assert.Collection(events,
var storedEvents = await _dbContext.Events.OrderBy(e => e.Timestamp).ToListAsync();
Assert.Collection(storedEvents,
foo => Assert.Equal(1, foo.Version),
bar => Assert.Equal(2, bar.Version));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@
using System.Threading.Tasks;
using RdbmsEventStore.EntityFramework.Tests.Infrastructure;
using RdbmsEventStore.EntityFramework.Tests.TestData;
using RdbmsEventStore.Serialization;
using Xunit;

namespace RdbmsEventStore.EntityFramework.Tests.ExtensibilityTests
{
public class NonDefaultEvent : IMutableEvent<long, long>
public class NonDefaultEvent : IMutableEvent<long>
{
public DateTimeOffset Timestamp { get; set; }
public long StreamId { get; set; }
public long Version { get; set; }
public Type Type { get; set; }
public object Payload { get; set; }
}

public class NonDefaultPersistedEvent : IPersistedEvent<long>
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
Expand All @@ -28,18 +38,18 @@ public class NonDefaultEvent : IMutableEvent<long, long>
public byte[] Payload { get; set; }
}

public class NonDefaultContext : DbContext, IEventDbContext<NonDefaultEvent>
public class NonDefaultContext : DbContext, IEventDbContext<NonDefaultPersistedEvent>
{
public DbSet<NonDefaultEvent> Events { get; set; }
public DbSet<NonDefaultPersistedEvent> Events { get; set; }
}

[Collection(nameof(InMemoryDatabaseCollection))]
public class NonDefaultImplementationsTests : IClassFixture<EventStoreFixture<long, long, NonDefaultEvent>>, IDisposable
public class NonDefaultImplementationsTests : IClassFixture<EventStoreFixture<long, long, NonDefaultEvent, IEventMetadata<long>, NonDefaultPersistedEvent>>, IDisposable
{
private readonly EventStoreFixture<long, long, NonDefaultEvent> _fixture;
private readonly EventStoreFixture<long, long, NonDefaultEvent, IEventMetadata<long>, NonDefaultPersistedEvent> _fixture;
private readonly NonDefaultContext _dbContext;

public NonDefaultImplementationsTests(EventStoreFixture<long, long, NonDefaultEvent> fixture)
public NonDefaultImplementationsTests(EventStoreFixture<long, long, NonDefaultEvent, IEventMetadata<long>, NonDefaultPersistedEvent> fixture)
{
EffortProviderFactory.ResetDb();
_fixture = fixture;
Expand All @@ -51,26 +61,26 @@ public async Task CanCommitEventsToStoreWithDefaultImplementations()
{
var store = _fixture.BuildEventStore(_dbContext);

await store.Commit(1, 0, new FooEvent { Foo = "Bar" });
await store.Append(1, 0, new[] { new FooEvent { Foo = "Bar" } });
}

[Fact]
public async Task CanReadEventsFromStoreWithNonDefaultImplementations()
{
_dbContext.Events.AddRange(new[]
{
new NonDefaultEvent
new NonDefaultPersistedEvent
{
StreamId = 1,
Timestamp = DateTimeOffset.UtcNow,
Version = 1,
Type = "FooEvent",
Payload =Encoding.UTF8.GetBytes(@"{""Foo"":""Bar""}")
Payload = Encoding.UTF8.GetBytes(@"{""Foo"":""Bar""}")
}
});
await _dbContext.SaveChangesAsync();

var store = _fixture.BuildEventStore(_dbContext);
var store = _fixture.BuildEventStore(_dbContext) as IEventStream<long, NonDefaultEvent, IEventMetadata<long>>;

var events = await store.Events(1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
using System;
using System.Data.Entity;
using RdbmsEventStore.EntityFramework.Tests.EventStoreTests;
using RdbmsEventStore.EntityFramework.Tests.TestData;
using RdbmsEventStore.EventRegistry;
using RdbmsEventStore.Serialization;

namespace RdbmsEventStore.EntityFramework.Tests.Infrastructure
{
public class EventStoreFixture<TId, TStreamId, TEvent>
public class EventStoreFixture<TId, TStreamId, TEvent, TEventMetadata, TPersistedEvent>
where TId : IEquatable<TId>
where TStreamId : IEquatable<TStreamId>
where TEvent : class, IMutableEvent<TId, TStreamId>, new()
where TEvent : class, TEventMetadata, IMutableEvent<TStreamId>, new()
where TPersistedEvent : class, TEventMetadata, IPersistedEvent<TStreamId>, new()
where TEventMetadata : IEventMetadata<TStreamId>
{
public EventStoreFixture()
{
EventRegistry = new AssemblyEventRegistry(typeof(TEvent), type => type.Name, type => !type.Name.StartsWith("<>"));
EventSerializer = new DefaultEventSerializer();
EventFactory = new DefaultEventFactory<TId, TStreamId, TEvent>(EventRegistry, EventSerializer);
EventSerializer = new DefaultEventSerializer<TStreamId, TEvent, TPersistedEvent>(EventRegistry);
EventFactory = new DefaultEventFactory<TStreamId, TEvent>();
WriteLock = new WriteLock();
}

public IEventRegistry EventRegistry { get; }
public IEventSerializer EventSerializer { get; }
public DefaultEventFactory<TId, TStreamId, TEvent> EventFactory { get; }
public IWriteLock WriteLock { get; }
public IEventRegistry EventRegistry { get; protected set; }
public IEventSerializer<TEvent, TPersistedEvent> EventSerializer { get; protected set; }
public IEventFactory<TStreamId, TEvent> EventFactory { get; protected set; }
public IWriteLock WriteLock { get; protected set; }

public EntityFrameworkEventStore<TId, TStreamId, TEventStoreContext, TEvent> BuildEventStore<TEventStoreContext>(TEventStoreContext dbContext)
where TEventStoreContext : DbContext, IEventDbContext<TEvent>
=> new EntityFrameworkEventStore<TId, TStreamId, TEventStoreContext, TEvent>(dbContext, EventFactory, WriteLock);
public EntityFrameworkEventStore<TId, TStreamId, TEventStoreContext, TEvent, TEventMetadata, TPersistedEvent> BuildEventStore<TEventStoreContext>(TEventStoreContext dbContext)
where TEventStoreContext : DbContext, IEventDbContext<TPersistedEvent>
=> new EntityFrameworkEventStore<TId, TStreamId, TEventStoreContext, TEvent, TEventMetadata, TPersistedEvent>(dbContext, EventFactory, WriteLock, EventSerializer);
}

public class ExtraMetaEventFactoryFixture : EventStoreFixture<long, string, ExtraMetaStringEvent, IExtraMeta, ExtraMetaLongStringPersistedEventMetadata>
{
public ExtraMetaEventFactoryFixture()
{
EventFactory = new ExtraMetaEventFactory();
EventSerializer = new ExtraMetaEventSerializer(EventRegistry);
}
}
}
Loading

0 comments on commit c8feaac

Please sign in to comment.