Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch for writing optimization for Inline projections #3290

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/DocumentDbTests/SessionMechanics/identity_map_mechanics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ public async Task given_record_with_same_id_already_added_then_map_should_be_upd

}

[Fact]
public async Task opt_into_identity_map_with_lightweight_sessions()
{
var target = Target.Random();

theSession.Store(target);
await theSession.SaveChangesAsync();

await using var lightweight = theStore.LightweightSession();
lightweight.UseIdentityMapFor<Target>();

var target1 = await lightweight.LoadAsync<Target>(target.Id);
var target2 = await lightweight.LoadAsync<Target>(target.Id);
var target3 = await lightweight.LoadAsync<Target>(target.Id);

target1.ShouldBeTheSameAs(target2);
target1.ShouldBeTheSameAs(target3);
}

public identity_map_mechanics(DefaultStoreFixture fixture): base(fixture)
{
}
Expand Down
11 changes: 7 additions & 4 deletions src/DocumentDbTests/Writing/document_inserts.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Marten.Exceptions;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
Expand All @@ -26,20 +27,22 @@ public void can_insert_all_new_documents()
}

[Fact]
public void can_insert_a_mixed_bag_of_documents()
public async Task can_insert_a_mixed_bag_of_documents()
{
await theStore.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(Target));

var docs = new object[]
{
Target.Random(), Target.Random(), Target.Random(), new User(), new User(), new User(), new User()
};

using (var session = theStore.LightweightSession())
await using (var session = theStore.LightweightSession())
{
session.InsertObjects(docs);
session.SaveChanges();
await session.SaveChangesAsync();
}

using (var query = theStore.QuerySession())
await using (var query = theStore.QuerySession())
{
query.Query<Target>().Count().ShouldBe(3);
query.Query<User>().Count().ShouldBe(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,42 @@ public async Task fetch_new_stream_for_writing_Guid_identifier()
document.CCount.ShouldBe(2);
}

[Fact]
public async Task silently_turns_on_identity_map_for_inline_aggregates()
{
StoreOptions(opts => opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline));

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.Aggregate.ShouldBeNull();
stream.CurrentVersion.ShouldBe(0);

stream.AppendOne(new AEvent());
stream.AppendMany(new BEvent(), new BEvent(), new BEvent());
stream.AppendMany(new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

using var session = theStore.LightweightSession();
var existing = await session.Events.FetchForWriting<SimpleAggregate>(streamId);

// Should already be using the identity map
var loadAgain = await session.LoadAsync<SimpleAggregate>(streamId);
loadAgain.ShouldBeTheSameAs(existing.Aggregate);

// Append to the stream and see that the existing aggregate is changed
existing.AppendOne(new AEvent());
await session.SaveChangesAsync();

// 1 from the original version, another we just appended
existing.Aggregate.ACount.ShouldBe(2);

using var query = theStore.QuerySession();
var loadedFresh = await query.LoadAsync<SimpleAggregate>(streamId);
loadedFresh.ACount.ShouldBe(2);
}

[Fact]
public async Task fetch_new_stream_for_writing_Guid_identifier_exception_handling()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Exceptions;
Expand Down Expand Up @@ -374,6 +375,10 @@ public class SimpleAggregate
// This will be the aggregate version
public long Version { get; set; }

public SimpleAggregate()
{
Debug.WriteLine("Here");
}

public Guid Id { get; set; }

Expand Down
4 changes: 3 additions & 1 deletion src/Marten/Events/Aggregation/AggregationRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
var aggregate = slice.Aggregate;
if (slice.Aggregate == null && lifecycle == ProjectionLifecycle.Inline)
{
aggregate = await Storage.LoadAsync(slice.Id, session, cancellation).ConfigureAwait(false);
// It's actually important to go in through the front door and use the session so that
// the identity map can kick in here
aggregate = await session.LoadAsync<TDoc>(slice.Id, cancellation).ConfigureAwait(false);
}

// Does the aggregate already exist before the events are applied?
Expand Down
4 changes: 4 additions & 0 deletions src/Marten/Events/Fetching/FetchInlinedPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio
await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);

// Opt into the identity map mechanics for this aggregate type just in case
// you're using a lightweight session
session.UseIdentityMapFor<TDoc>();

if (forUpdate)
{
await session.BeginTransactionAsync(cancellation).ConfigureAwait(false);
Expand Down
9 changes: 9 additions & 0 deletions src/Marten/IDocumentOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,13 @@ public interface IDocumentOperations: IQuerySession
/// <param name="sql"></param>
/// <param name="parameterValues"></param>
void QueueSqlCommand(string sql, params object[] parameterValues);


/// <summary>
/// In the case of a lightweight session, this will direct Marten to opt into identity map mechanics
/// for only the document type T. This is a micro-optimization added for the event sourcing + projections
/// support
/// </summary>
/// <typeparam name="T"></typeparam>
public void UseIdentityMapFor<T>();
}
25 changes: 11 additions & 14 deletions src/Marten/Internal/Sessions/DocumentSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void SetHeader(string key, object value)
return tenantSession;
}

protected override IQueryEventStore CreateEventStore(DocumentStore store, Tenant tenant)
protected override IQueryEventStore createEventStore(DocumentStore store, Tenant tenant)
{
return new EventStore(this, store, tenant);
}
Expand Down Expand Up @@ -365,22 +365,19 @@ private void store<T>(IEnumerable<T> entities) where T : notnull

private void storeEntity<T>(T entity, IDocumentStorage<T> storage) where T : notnull
{
if (entity is IVersioned versioned)
switch (entity)
{
if (versioned.Version != Guid.Empty)
{
storage.Store(this, entity, versioned.Version);
case IVersioned v when v.Version != Guid.Empty:
storage.Store(this, entity, v.Version);
return;
}
}
else if (entity is IRevisioned revisioned && revisioned.Version != 0)
{
storage.Store(this, entity, revisioned.Version);
return;
case IRevisioned r when r.Version != 0:
storage.Store(this, entity, r.Version);
return;
default:
// Put it in the identity map -- if necessary
storage.Store(this, entity);
break;
}

// Put it in the identity map -- if necessary
storage.Store(this, entity);
}

public void EjectPatchedTypes(IUnitOfWork changes)
Expand Down
7 changes: 7 additions & 0 deletions src/Marten/Internal/Sessions/LightweightSession.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#nullable enable
using System;
using JasperFx.Core;
using Marten.Internal.Storage;
using Marten.Services;

Expand All @@ -14,6 +15,12 @@ internal LightweightSession(DocumentStore store, SessionOptions sessionOptions,

internal override DocumentTracking TrackingMode => DocumentTracking.None;

public override void UseIdentityMapFor<T>()
{
var documentStorage = _providers.StorageFor<T>().IdentityMap;
overrideStorage(typeof(T), documentStorage);
}

protected internal override IDocumentStorage<T> selectStorage<T>(DocumentProvider<T> provider)
{
return provider.Lightweight;
Expand Down
1 change: 0 additions & 1 deletion src/Marten/Internal/Sessions/QuerySession.Load.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.CodeGeneration;
using JasperFx.Core.Reflection;
using Marten.Exceptions;
using Marten.Internal.Storage;
Expand Down
25 changes: 23 additions & 2 deletions src/Marten/Internal/Sessions/QuerySession.Storage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ namespace Marten.Internal.Sessions;

public partial class QuerySession
{
private readonly IProviderGraph _providers;
protected readonly IProviderGraph _providers;

private ImHashMap<Type, IDocumentStorage> _byType = ImHashMap<Type, IDocumentStorage>.Empty;
protected ImHashMap<Type, IDocumentStorage> _byType = ImHashMap<Type, IDocumentStorage>.Empty;

protected void overrideStorage(Type type, IDocumentStorage storage)
{
_byType = _byType.AddOrUpdate(type, storage);
}

public IDocumentStorage StorageFor(Type documentType)
{
Expand All @@ -29,9 +34,25 @@ public IDocumentStorage StorageFor(Type documentType)

public IDocumentStorage<T> StorageFor<T>() where T : notnull
{
if (_byType.TryFind(typeof(T), out var storage))
{
return (IDocumentStorage<T>)storage;
}

return selectStorage(_providers.StorageFor<T>());
}

/// <summary>
/// In the case of a lightweight session, this will direct Marten to opt into identity map mechanics
/// for only the document type T. This is a micro-optimization added for the event sourcing + projections
/// support
/// </summary>
/// <typeparam name="T"></typeparam>
public virtual void UseIdentityMapFor<T>()
{
// Nothing by default
}

public IEventStorage EventStorage()
{
return (IEventStorage)selectStorage(_providers.StorageFor<IEvent>());
Expand Down
29 changes: 14 additions & 15 deletions src/Marten/Internal/Sessions/QuerySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public partial class QuerySession: IMartenSession, IQuerySession
public StoreOptions Options { get; }
public IQueryEventStore Events { get; }

protected virtual IQueryEventStore CreateEventStore(DocumentStore store, Tenant tenant)
protected virtual IQueryEventStore createEventStore(DocumentStore store, Tenant tenant)
{
return new QueryEventStore(this, store, tenant);
}
Expand Down Expand Up @@ -71,7 +71,7 @@ internal QuerySession(
Serializer = store.Serializer;
Options = store.Options;

Events = CreateEventStore(store, tenant ?? sessionOptions.Tenant);
Events = createEventStore(store, tenant ?? sessionOptions.Tenant);

Logger = store.Options.Logger().StartSession(this);

Expand All @@ -84,20 +84,19 @@ public NpgsqlConnection Connection
{
get
{
if (_connection is IAlwaysConnectedLifetime lifetime)
switch (_connection)
{
return lifetime.Connection;
}
else if (_connection is ITransactionStarter starter)
{
var l = starter.Start();
_connection = l;
return l.Connection;
}
else
{
throw new InvalidOperationException(
$"The current lifetime {_connection} is neither a {nameof(IAlwaysConnectedLifetime)} nor a {nameof(ITransactionStarter)}");
case IAlwaysConnectedLifetime lifetime:
return lifetime.Connection;
case ITransactionStarter starter:
{
var l = starter.Start();
_connection = l;
return l.Connection;
}
default:
throw new InvalidOperationException(
$"The current lifetime {_connection} is neither a {nameof(IAlwaysConnectedLifetime)} nor a {nameof(ITransactionStarter)}");
}
}
}
Expand Down
Loading