diff --git a/src/JKang.EventSourcing.Abstractions/Domain/Aggregate.cs b/src/JKang.EventSourcing.Abstractions/Domain/Aggregate.cs index cec5555..7ead5f1 100644 --- a/src/JKang.EventSourcing.Abstractions/Domain/Aggregate.cs +++ b/src/JKang.EventSourcing.Abstractions/Domain/Aggregate.cs @@ -161,6 +161,7 @@ private void IntegrateSnapshot(IAggregateSnapshot snapshot) internal class Changeset : IAggregateChangeset { private readonly Aggregate _aggregate; + private bool _committed = false; public Changeset( Aggregate aggregate, @@ -182,6 +183,11 @@ public Changeset( public void Commit() { + if (_committed) + { + throw new InvalidOperationException("The changeset is already committed"); + } + for (int i = 0; i < Events.Count(); i++) { IAggregateEvent @evt = _aggregate._unsavedEvents.Dequeue(); @@ -189,6 +195,7 @@ public void Commit() } _aggregate._unsavedSnapshot = null; + _committed = true; } } } diff --git a/src/JKang.EventSourcing.Abstractions/Domain/IAggregateChangeset.cs b/src/JKang.EventSourcing.Abstractions/Domain/IAggregateChangeset.cs index 98c4580..dca8a63 100644 --- a/src/JKang.EventSourcing.Abstractions/Domain/IAggregateChangeset.cs +++ b/src/JKang.EventSourcing.Abstractions/Domain/IAggregateChangeset.cs @@ -1,5 +1,4 @@ using JKang.EventSourcing.Events; -using JKang.EventSourcing.Snapshotting; using System.Collections.Generic; namespace JKang.EventSourcing.Domain diff --git a/src/JKang.EventSourcing.Abstractions/Persistence/AggregateRepository.cs b/src/JKang.EventSourcing.Abstractions/Persistence/AggregateRepository.cs index 9433bca..12d171f 100644 --- a/src/JKang.EventSourcing.Abstractions/Persistence/AggregateRepository.cs +++ b/src/JKang.EventSourcing.Abstractions/Persistence/AggregateRepository.cs @@ -22,7 +22,7 @@ protected AggregateRepository( _snapshotStore = snapshotStore ?? throw new ArgumentNullException(nameof(snapshotStore)); } - protected async Task SaveAggregateAsync(TAggregate aggregate, + protected async Task> SaveAggregateAsync(TAggregate aggregate, CancellationToken cancellationToken = default) { if (aggregate is null) @@ -35,24 +35,17 @@ protected async Task SaveAggregateAsync(TAggregate aggregate, foreach (IAggregateEvent @event in changeset.Events) { await _eventStore.AddEventAsync(@event, cancellationToken).ConfigureAwait(false); - await OnEventSavedAsync(@event, cancellationToken).ConfigureAwait(false); } if (changeset.Snapshot != null) { await _snapshotStore.AddSnapshotAsync(changeset.Snapshot, cancellationToken).ConfigureAwait(false); - await OnSnapshotSavedAsync(changeset.Snapshot, cancellationToken).ConfigureAwait(false); } changeset.Commit(); + return changeset; } - protected virtual Task OnEventSavedAsync(IAggregateEvent e, - CancellationToken cancellationToken = default) => Task.CompletedTask; - - protected virtual Task OnSnapshotSavedAsync(IAggregateSnapshot snapshot, - CancellationToken cancellationToken = default) => Task.CompletedTask; - protected Task GetAggregateIdsAsync() { return _eventStore.GetAggregateIdsAsync();