Skip to content

Commit

Permalink
Merge pull request #56 from jacqueskang/feature/caching
Browse files Browse the repository at this point in the history
feat: load aggregate with specific version
  • Loading branch information
jacqueskang authored Feb 12, 2020
2 parents bd41d85 + bbab5d6 commit 3b6f9e5
Show file tree
Hide file tree
Showing 26 changed files with 369 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<ItemGroup>
<PackageReference Include="AutoFixture.Xunit2" Version="4.11.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="Moq" Version="4.13.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
using AutoFixture.Xunit2;
using JKang.EventSourcing.Events;
using JKang.EventSourcing.Persistence;
using JKang.EventSourcing.Snapshotting.Persistence;
using JKang.EventSourcing.TestingFixtures;
using Moq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace JKang.EventSourcing.Abstractions.Tests.Persistence
{
public class AggregateRepositoryTest
{
private readonly Mock<IEventStore<GiftCard, Guid>> _eventStore;
private readonly Mock<ISnapshotStore<GiftCard, Guid>> _snapshotStore;
private readonly GiftCardRepository _sut;

public AggregateRepositoryTest()
{
_eventStore = new Mock<IEventStore<GiftCard, Guid>>();
_snapshotStore = new Mock<ISnapshotStore<GiftCard, Guid>>();
_sut = new GiftCardRepository(_eventStore.Object, _snapshotStore.Object);
}

[Theory, AutoData]
public async Task FindAggregateAsync_NoEvent_ReturnNull(Guid id)
{
// arrange
_eventStore
.Setup(x => x.GetEventsAsync(id, 1, int.MaxValue, It.IsAny<CancellationToken>()))
.ReturnsAsync(new IAggregateEvent<Guid>[] { });

// act
GiftCard actual = await _sut.FindGiftCardAsync(id);

// assert
Assert.Null(actual);
}

[Theory, AutoData]
public async Task FindAggregateAsync_NoSnapshot_HappyPath(Guid id)
{
// arrange
_eventStore
.Setup(x => x.GetEventsAsync(id, 1, int.MaxValue, It.IsAny<CancellationToken>()))
.ReturnsAsync(new IAggregateEvent<Guid>[] {
new GiftCardCreated(id, DateTime.UtcNow.AddDays(-2), 100),
new GiftCardDebited(id, 2, DateTime.UtcNow.AddDays(-1), 30)
});

// act
GiftCard actual = await _sut.FindGiftCardAsync(id);

// assert
Assert.NotNull(actual);
Assert.Equal(2, actual.Version);
Assert.Null(actual.Snapshot);
Assert.Equal(70, actual.Balance);
}

[Theory, AutoData]
public async Task FindAggregateAsync_IgnoreSnapshot_HappyPath(Guid id)
{
_eventStore
.Setup(x => x.GetEventsAsync(id, 1, int.MaxValue, It.IsAny<CancellationToken>()))
.ReturnsAsync(new IAggregateEvent<Guid>[] {
new GiftCardCreated(id, DateTime.UtcNow.AddDays(-2), 100),
new GiftCardDebited(id, 2, DateTime.UtcNow.AddDays(-1), 30)
});

// act
GiftCard actual = await _sut.FindGiftCardAsync(id, ignoreSnapshot: true);

// assert
Assert.NotNull(actual);
Assert.Equal(70, actual.Balance);
_snapshotStore.Verify(x => x.FindLastSnapshotAsync(It.IsAny<Guid>(), It.IsAny<int>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Theory, AutoData]
public async Task FindAggregateAsync_WithSnapshot_HappyPath(Guid id, int snapshotVersion)
{
// arrange
_snapshotStore
.Setup(x => x.FindLastSnapshotAsync(id, int.MaxValue, It.IsAny<CancellationToken>()))
.ReturnsAsync(new GiftCardSnapshot(id, snapshotVersion, 100));
_eventStore
.Setup(x => x.GetEventsAsync(id, snapshotVersion + 1, int.MaxValue, It.IsAny<CancellationToken>()))
.ReturnsAsync(new IAggregateEvent<Guid>[] {
new GiftCardDebited(id, snapshotVersion + 1, DateTime.UtcNow.AddDays(-1), 30)
});

// act
GiftCard actual = await _sut.FindGiftCardAsync(id);

// assert
Assert.NotNull(actual);
Assert.Equal(snapshotVersion + 1, actual.Version);
Assert.NotNull(actual.Snapshot);
Assert.Equal(70, actual.Balance);
}

[Theory, AutoData]
public async Task FindAggregateAsync_WithVersion_WithSnapshot_HappyPath(Guid id, int version)
{
// arrange
_snapshotStore
.Setup(x => x.FindLastSnapshotAsync(id, version, It.IsAny<CancellationToken>()))
.ReturnsAsync(new GiftCardSnapshot(id, version - 2, 100));
_eventStore
.Setup(x => x.GetEventsAsync(id, version - 1, version, It.IsAny<CancellationToken>()))
.ReturnsAsync(new IAggregateEvent<Guid>[] {
new GiftCardDebited(id, version - 1, DateTime.UtcNow.AddDays(-1), 30),
new GiftCardDebited(id, version, DateTime.UtcNow, 30)
});

// act
GiftCard actual = await _sut.FindGiftCardAsync(id, version: version);

// assert
Assert.NotNull(actual);
Assert.Equal(40, actual.Balance);
}

[Theory, AutoData]
public async Task FindAggregateAsync_WithVersion_WithoutSnapshot_HappyPath(Guid id)
{
// arrange
_eventStore
.Setup(x => x.GetEventsAsync(id, 1, 3, It.IsAny<CancellationToken>()))
.ReturnsAsync(new IAggregateEvent<Guid>[] {
new GiftCardCreated(id, DateTime.UtcNow.AddDays(-2), 100),
new GiftCardDebited(id, 2, DateTime.UtcNow.AddDays(-1), 30),
new GiftCardDebited(id, 3, DateTime.UtcNow, 30)
});

// act
GiftCard actual = await _sut.FindGiftCardAsync(id, version: 3);

// assert
Assert.NotNull(actual);
Assert.Null(actual.Snapshot);
Assert.Equal(40, actual.Balance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,22 @@ protected virtual Task<TKey[]> GetAggregateIdsAsync()

protected virtual async Task<TAggregate> FindAggregateAsync(TKey id,
bool ignoreSnapshot = false,
int version = -1,
CancellationToken cancellationToken = default)
{
int maxVersion = version <= 0 ? int.MaxValue : version;

IAggregateSnapshot<TKey> snapshot = null;
if (!ignoreSnapshot)
{
snapshot = await _snapshotStore
.FindLastSnapshotAsync(id, cancellationToken)
.FindLastSnapshotAsync(id, maxVersion, cancellationToken)
.ConfigureAwait(false);
}

int minVersion = snapshot == null ? 1 : snapshot.AggregateVersion + 1;
IAggregateEvent<TKey>[] events = await _eventStore
.GetEventsAsync(id, snapshot == null ? 0 : snapshot.AggregateVersion, cancellationToken)
.GetEventsAsync(id, minVersion, maxVersion, cancellationToken)
.ConfigureAwait(false);

if (snapshot == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Task AddEventAsync(IAggregateEvent<TKey> e,
CancellationToken cancellationToken = default);

Task<IAggregateEvent<TKey>[]> GetEventsAsync(TKey aggregateId,
int skip = 0,
int minVersion, int maxVersion,
CancellationToken cancellationToken = default);

Task<TKey[]> GetAggregateIdsAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public Task AddSnapshotAsync(IAggregateSnapshot<TKey> snapshot, CancellationToke
return Task.CompletedTask;
}

public Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, CancellationToken cancellationToken = default)
public Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, int maxVersion,
CancellationToken cancellationToken = default)
{
return Task.FromResult(null as IAggregateSnapshot<TKey>);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface ISnapshotStore<TAggregate, TKey>
Task AddSnapshotAsync(IAggregateSnapshot<TKey> snapshot,
CancellationToken cancellationToken = default);

Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, int maxVersion,
CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,20 @@ protected CachedAggregateRepository(
protected override async Task<TAggregate> FindAggregateAsync(
TKey id,
bool ignoreSnapshot = false,
int version = -1,
CancellationToken cancellationToken = default)
{
string key = GetCacheKey(id);
string key = GetCacheKey(id, version);
string serialized = await _cache.GetStringAsync(key, cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrEmpty(serialized))
{
return DeserializeAggregate(serialized);
}

TAggregate aggregate = await base.FindAggregateAsync(id, ignoreSnapshot, cancellationToken)
TAggregate aggregate = await base.FindAggregateAsync(id, ignoreSnapshot, version, cancellationToken)
.ConfigureAwait(false);

await CacheAsync(aggregate, cancellationToken).ConfigureAwait(false);
await CacheAsync(aggregate, version, cancellationToken).ConfigureAwait(false);

return aggregate;
}
Expand All @@ -71,20 +72,20 @@ protected override async Task<IAggregateChangeset<TKey>> SaveAggregateAsync(
IAggregateChangeset<TKey> changeset = await base.SaveAggregateAsync(aggregate, cancellationToken)
.ConfigureAwait(false);

await CacheAsync(aggregate, cancellationToken).ConfigureAwait(false);
await CacheAsync(aggregate, -1, cancellationToken).ConfigureAwait(false);

return changeset;
}

private async Task CacheAsync(TAggregate aggregate,
private async Task CacheAsync(TAggregate aggregate, int version,
CancellationToken cancellationToken)
{
string serialized = SerializeAggregate(aggregate);
string key = GetCacheKey(aggregate.Id);
string key = GetCacheKey(aggregate.Id, version);
await _cache.SetStringAsync(key, serialized, _cacheOptions, cancellationToken).ConfigureAwait(false);
}

protected virtual string GetCacheKey(TKey id) => $"{typeof(TAggregate).FullName}_{id}";
protected virtual string GetCacheKey(TKey id, int version) => $"{typeof(TAggregate).FullName}_{id}_{version}";

protected virtual TAggregate DeserializeAggregate(string serialized)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ public async Task<TKey[]> GetAggregateIdsAsync(

public async Task<IAggregateEvent<TKey>[]> GetEventsAsync(
TKey aggregateId,
int skip = 0,
int minVersion,
int maxVersion,
CancellationToken cancellationToken = default)
{
string query = $@"
SELECT VALUE c.data
FROM c
WHERE c.data.aggregateId = '{aggregateId}' AND c.data.aggregateVersion > {skip}
WHERE c.data.aggregateId = '{aggregateId}' AND
c.data.aggregateVersion >= {minVersion} AND
c.data.aggregateVersion <= {maxVersion}
ORDER BY c.data.aggregateVersion";
FeedIterator<IAggregateEvent<TKey>> iterator = _container
.GetItemQueryIterator<IAggregateEvent<TKey>>(new QueryDefinition(query));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ await _container
.ConfigureAwait(false);
}

public async Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
public async Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, int maxVersion,
CancellationToken cancellationToken = default)
{
string query = $@"
SELECT TOP 1 VALUE c.data
FROM c
WHERE c.data.aggregateId = '{aggregateId}'
WHERE c.data.aggregateId = '{aggregateId}' AND c.data.aggregateVersion <= {maxVersion}
ORDER BY c.data.aggregateVersion DESC";

FeedIterator<IAggregateSnapshot<TKey>> iterator = _container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ public async Task<TKey[]> GetAggregateIdsAsync(

public async Task<IAggregateEvent<TKey>[]> GetEventsAsync(
TKey aggregateId,
int skip = 0,
int minVersion,
int maxVersion,
CancellationToken cancellationToken = default)
{
Search search = _table.Query(aggregateId as dynamic, new QueryFilter("aggregateVersion", QueryOperator.GreaterThan, skip));
var filter = new QueryFilter("aggregateVersion", QueryOperator.GreaterThanOrEqual, minVersion);
filter.AddCondition("aggregateVersion", QueryOperator.LessThanOrEqual, maxVersion);
Search search = this._table.Query(aggregateId as dynamic, filter);

var events = new List<IAggregateEvent<TKey>>();
do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ public async Task AddSnapshotAsync(IAggregateSnapshot<TKey> snapshot,
await _table.PutItemAsync(item, cancellationToken).ConfigureAwait(false);
}

public async Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
public async Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, int maxVersion,
CancellationToken cancellationToken = default)
{

var filter = new QueryFilter("aggregateId", QueryOperator.Equal, aggregateId as dynamic);
filter.AddCondition("aggregateVersion", QueryOperator.LessThanOrEqual, maxVersion);
Search search = _table.Query(new QueryOperationConfig
{
Filter = new QueryFilter("aggregateId", QueryOperator.Equal, aggregateId as dynamic),
Filter = filter,
Limit = 1,
BackwardSearch = true,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ public Task<TKey[]> GetAggregateIdsAsync(

public async Task<IAggregateEvent<TKey>[]> GetEventsAsync(
TKey aggregateId,
int skip = 0,
int minVersion, int maxVersion,
CancellationToken cancellationToken = default)
{
List<string> serializedEvents = await _context.GetEventDbSet()
.Where(x => x.AggregateId.Equals(aggregateId))
.Where(x => x.AggregateVersion >= minVersion)
.Where(x => x.AggregateVersion <= maxVersion)
.OrderBy(x => x.AggregateVersion)
.Skip(skip)
.Select(x => x.Serialized)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ public async Task AddSnapshotAsync(IAggregateSnapshot<TKey> snapshot,
await _context.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
}

public async Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
public async Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, int maxVersion,
CancellationToken cancellationToken = default)
{
string serialized = await _context.GetSnapshotDbSet()
.Where(x => x.AggregateId.Equals(aggregateId))
.Where(x => x.AggregateVersion <= maxVersion)
.OrderByDescending(x => x.AggregateVersion)
.Select(x => x.Serialized)
.FirstOrDefaultAsync(cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Task AddSnapshotAsync(IAggregateSnapshot<TKey> snapshot,
return Task.CompletedTask;
}

public Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
public Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId, int maxVersion,
CancellationToken cancellationToken = default)
{
if (!Directory.Exists(_options.Folder))
Expand All @@ -54,6 +54,7 @@ public Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
.Select(x => Path.GetFileNameWithoutExtension(x))
.Select(x => x.Split('.').LastOrDefault())
.Select(x => int.TryParse(x, NumberStyles.Integer, CultureInfo.InvariantCulture, out int version) ? version : -1)
.Where(x => x <= maxVersion)
.OrderByDescending(x => x)
.FirstOrDefault();

Expand All @@ -70,6 +71,6 @@ public Task<IAggregateSnapshot<TKey>> FindLastSnapshotAsync(TKey aggregateId,
}

private string GetFilePath(TKey aggregateId, int version)
=> Path.Combine(_options.Folder, $"{aggregateId}.{version.ToString(CultureInfo.InvariantCulture)}.snapshot");
=> Path.Combine(_options.Folder, $"{aggregateId}.{version.ToString(CultureInfo.InvariantCulture)}.snapshot");
}
}
Loading

0 comments on commit 3b6f9e5

Please sign in to comment.