Skip to content

Commit

Permalink
feat: use the transactional aggregate handler on the orleans aggregat…
Browse files Browse the repository at this point in the history
…e handler grain
  • Loading branch information
corstian committed Dec 15, 2024
1 parent a782fe4 commit 1e426f3
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ public class AggregateHandlerGrain<TAggregate>

public AggregateHandlerGrain(
IServiceProvider services,
ILogger<AbstractAggregateHandlerGrain<TAggregate>> logger,
IDocumentStore store)
: base(services, logger)
: base(services)
{
_store = store;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public CommandMetadata ConvertFromSurrogate(in CommandMetadataSurrogate surrogat
AggregateId = surrogate.AggregateId,
AggregateType = surrogate.AggregateType,
CreatedAt = surrogate.CreatedAt,
Attributes = surrogate.Attributes,
ParentContext = surrogate.SourceActivity
};

Expand All @@ -20,6 +21,7 @@ public CommandMetadataSurrogate ConvertToSurrogate(in CommandMetadata value) =>
AggregateId = value.AggregateId,
AggregateType = value.AggregateType,
CreatedAt = value.CreatedAt,
Attributes = value.Attributes,
SourceActivity = value.ParentContext
};
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
using System.Diagnostics;
using FluentResults;
using FluentResults;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.EventSourcing;
using Orleans.EventSourcing.CustomStorage;
using Orleans.Serialization.Invocation;
using Whaally.Domain.Abstractions;

namespace Whaally.Domain.Infrastructure.OrleansHost.Grains;

[MayInterleave(nameof(DoInterleave))]
public abstract class AbstractAggregateHandlerGrain<TAggregate> :
JournaledGrain<TAggregate, IEventEnvelope>,
ICustomStorageInterface<TAggregate, IEventEnvelope>,
IAggregateHandlerGrain<TAggregate>
where TAggregate : class, IAggregate, new()
{
public static bool DoInterleave(IInvokable req) => true;

private readonly IServiceProvider _services;
private readonly ILogger<AbstractAggregateHandlerGrain<TAggregate>> _logger;

protected IAggregateHandler<TAggregate> AggregateHandler;
protected TAggregate Aggregate = new();

public AbstractAggregateHandlerGrain(
IServiceProvider services,
ILogger<AbstractAggregateHandlerGrain<TAggregate>> logger)

public AbstractAggregateHandlerGrain(IServiceProvider services)
{
_services = services;
_logger = logger;


Aggregate = new();
AggregateHandler = new DefaultAggregateHandler<TAggregate>(_services, this.GetPrimaryKey().ToString())
AggregateHandler = new TransactionalAggregateHandler<TAggregate>(_services, this.GetPrimaryKey().ToString())
{
Aggregate = Aggregate
};
}

public override async Task OnActivateAsync(CancellationToken token)
{
await RefreshNow();
Expand All @@ -57,20 +56,9 @@ protected override void TransitionState(
*/
AggregateHandler.Apply(eventEnvelope);
}

public async Task<IResult<IEventEnvelope>> Evaluate(ICommandEnvelope commandEnvelope)
{
var result = await AggregateHandler.Evaluate(
commandEnvelope.Messages.ToArray());

if (result.IsSuccess)
_logger.LogTrace("Evaluation succesful\r\n\tCommands: {@commands}", commandEnvelope.Messages);
else
_logger.LogTrace("Evaluation failed\r\n\tCommands: {@command}\r\n\tReasons: {@reasons}", commandEnvelope.Messages,
result.Reasons);

return result;
}

public async Task<IResult<IEventEnvelope>> Evaluate(ICommandEnvelope commandEnvelope) =>
await AggregateHandler.Evaluate(commandEnvelope.Messages.ToArray());

public async Task<IResultBase> Apply(IEventEnvelope eventEnvelope)
{
Expand All @@ -92,22 +80,15 @@ public async Task<IResultBase> Apply(IEventEnvelope eventEnvelope)
* events are applied against the latest state.
*/
await ConfirmEvents();

_logger.LogTrace("Events applied: {@events}", eventEnvelope.Messages);

return Result.Ok();
}

public Task Abort(IMessageMetadata metadata)
{
throw new NotImplementedException();
}
=> AggregateHandler.Abort(metadata);

[ReadOnly]
public Task<TSnapshot> Snapshot<TSnapshot>() where TSnapshot : ISnapshot
{
return AggregateHandler.Snapshot<TSnapshot>();
}
public Task<TSnapshot> Snapshot<TSnapshot>() where TSnapshot : ISnapshot => AggregateHandler.Snapshot<TSnapshot>();

/// <summary>
/// Retrieve the current aggregate version from storage
Expand All @@ -125,4 +106,4 @@ public Task<TSnapshot> Snapshot<TSnapshot>() where TSnapshot : ISnapshot
/// <param name="expectedversion">The expected version after the events had been applied</param>
/// <returns>A boolean value indicating success state</returns>
public abstract Task<bool> ApplyUpdatesToStorage(IReadOnlyList<IEventEnvelope> updates, int expectedversion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ namespace Whaally.Domain.Infrastructure.OrleansHost;

public interface IAggregateHandlerGrain<TAggregate> : IGrainWithGuidKey,
IAggregateHandler<TAggregate>
where TAggregate : class, IAggregate
{

}
where TAggregate : class, IAggregate;

0 comments on commit 1e426f3

Please sign in to comment.