Skip to content

Commit

Permalink
Port some of the v7 changes for transactional session over (#6530) (#…
Browse files Browse the repository at this point in the history
…6533)

* Bring in two acceptance tests from the v7 work for the transactional session

* Name param the same

* Move saga manifests to the saga persister and introduce the synchronized storage session feature
  • Loading branch information
danielmarbach authored Sep 7, 2022
1 parent cdd3a3d commit bc839e8
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ValueTask<bool> TryOpen(TransportTransaction transportTransaction, Contex
return new ValueTask<bool>(true);
}

public Task Open(ContextBag contextBag, CancellationToken cancellationToken = default)
public Task Open(ContextBag context, CancellationToken cancellationToken = default)
{
ownsTransaction = true;
Transaction = new AcceptanceTestingTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace NServiceBus.AcceptanceTests.Core.Persistence
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.Persistence;
using NUnit.Framework;

public class When_a_persistence_provides_synchronized_session : NServiceBusAcceptanceTest
{
[Test]
public async Task Synchronized_session_should_be_of_exact_type_provided_by_persistence()
{
var result = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(e => e.When(b => b.SendLocal(new MyMessage())))
.Done(c => c.MessageReceived)
.Run();

Assert.IsNotNull(result.SynchronizedStorageSessionInstanceInContainer);
Assert.IsNotNull(result.SynchronizedStorageSessionInstanceInHandlingContext);
Assert.AreSame(result.SynchronizedStorageSessionInstanceInContainer, result.SynchronizedStorageSessionInstanceInHandlingContext);
}

class Context : ScenarioContext
{
public ISynchronizedStorageSession SynchronizedStorageSessionInstanceInContainer { get; set; }
public ISynchronizedStorageSession SynchronizedStorageSessionInstanceInHandlingContext { get; set; }
public bool MessageReceived { get; set; }
}

class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() => EndpointSetup<DefaultServer>();

class MyMessageHandler : IHandleMessages<MyMessage>
{
public MyMessageHandler(Context testContext, ISynchronizedStorageSession storageSession)
{
this.testContext = testContext;
testContext.SynchronizedStorageSessionInstanceInContainer = storageSession;
}

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
testContext.SynchronizedStorageSessionInstanceInHandlingContext = context.SynchronizedStorageSession;
testContext.MessageReceived = true;
return Task.FromResult(0);
}

readonly Context testContext;
}
}

public class MyMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace NServiceBus.AcceptanceTests.Reliability.SynchronizedStorage
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Extensibility;
using Features;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using Persistence;

public class When_opening_storage_session_outside_pipeline : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_provide_adapted_session_with_same_scope()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.Done(c => c.Done)
.Run();

Assert.True(context.SessionNotNullAfterOpening, "The adapted session was null after opening the session.");
Assert.True(context.StorageSessionEqual, "The scoped storage session should be equal.");
}

public class Context : ScenarioContext
{
public bool StorageSessionEqual { get; set; }
public bool SessionNotNullAfterOpening { get; set; }
public bool Done { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<Bootstrapper>();
});
}

public class Bootstrapper : Feature
{
public Bootstrapper() => EnableByDefault();

protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(b => new MyTask(b.GetRequiredService<Context>(), b));
}

public class MyTask : FeatureStartupTask
{
public MyTask(Context scenarioContext, IServiceProvider provider)
{
this.provider = provider;
this.scenarioContext = scenarioContext;
}

protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
using (var scope = provider.CreateScope())
using (var completableSynchronizedStorageSession =
scope.ServiceProvider.GetRequiredService<ICompletableSynchronizedStorageSession>())
{
await completableSynchronizedStorageSession.Open(new ContextBag(), cancellationToken);

scenarioContext.SessionNotNullAfterOpening =
scope.ServiceProvider.GetService<ISynchronizedStorageSession>() != null;

var synchronizedStorage = scope.ServiceProvider.GetService<ISynchronizedStorageSession>();

scenarioContext.StorageSessionEqual =
completableSynchronizedStorageSession == synchronizedStorage;

await completableSynchronizedStorageSession.CompleteAsync(cancellationToken);
}

scenarioContext.Done = true;
}

protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;

readonly Context scenarioContext;
readonly IServiceProvider provider;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ namespace NServiceBus.Persistence
public interface ICompletableSynchronizedStorageSession : NServiceBus.Persistence.ISynchronizedStorageSession, System.IDisposable
{
System.Threading.Tasks.Task CompleteAsync(System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag contextBag, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag context, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.ValueTask<bool> TryOpen(NServiceBus.Outbox.IOutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.ValueTask<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context, System.Threading.CancellationToken cancellationToken = default);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ValueTask<bool> TryOpen(TransportTransaction transportTransaction, Contex
return new ValueTask<bool>(true);
}

public Task Open(ContextBag contextBag, CancellationToken cancellationToken = default)
public Task Open(ContextBag context, CancellationToken cancellationToken = default)
{
ownsTransaction = true;
Transaction = new FakeTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class LearningPersistence : PersistenceDefinition
{
internal LearningPersistence()
{
Defaults(s => s.EnableFeatureByDefault<LearningSynchronizedStorage>());

Supports<StorageType.Sagas>(s => s.EnableFeatureByDefault<LearningSagaPersistence>());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace NServiceBus
{
using Features;
using Microsoft.Extensions.DependencyInjection;
using Persistence;

sealed class LearningSynchronizedStorage : Feature
{
public LearningSynchronizedStorage()
{
DependsOn<SynchronizedStorage>();
}

protected internal override void Setup(FeatureConfigurationContext context)
{
context.Services.AddScoped<ICompletableSynchronizedStorageSession, LearningSynchronizedStorageSession>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ namespace NServiceBus
[SkipWeaving]
class LearningSynchronizedStorageSession : ICompletableSynchronizedStorageSession
{
public LearningSynchronizedStorageSession(SagaManifestCollection sagaManifests)
{
this.sagaManifests = sagaManifests;
}

public void Dispose()
{
foreach (var sagaFile in sagaFiles.Values)
Expand All @@ -34,7 +29,7 @@ public ValueTask<bool> TryOpen(IOutboxTransaction transaction, ContextBag contex
public ValueTask<bool> TryOpen(TransportTransaction transportTransaction, ContextBag context,
CancellationToken cancellationToken = default) => new ValueTask<bool>(false);

public Task Open(ContextBag contextBag, CancellationToken cancellationToken = default)
public Task Open(ContextBag context, CancellationToken cancellationToken = default)
=> Task.CompletedTask;

public async Task CompleteAsync(CancellationToken cancellationToken = default)
Expand All @@ -46,9 +41,9 @@ public async Task CompleteAsync(CancellationToken cancellationToken = default)
deferredActions.Clear();
}

public async Task<TSagaData> Read<TSagaData>(Guid sagaId, CancellationToken cancellationToken = default) where TSagaData : class, IContainSagaData
public async Task<TSagaData> Read<TSagaData>(Guid sagaId, SagaManifestCollection sagaManifests, CancellationToken cancellationToken = default) where TSagaData : class, IContainSagaData
{
var sagaStorageFile = await Open(sagaId, typeof(TSagaData), cancellationToken)
var sagaStorageFile = await Open(sagaId, typeof(TSagaData), sagaManifests, cancellationToken)
.ConfigureAwait(false);

if (sagaStorageFile == null)
Expand All @@ -60,22 +55,22 @@ public async Task<TSagaData> Read<TSagaData>(Guid sagaId, CancellationToken canc
.ConfigureAwait(false);
}

public void Update(IContainSagaData sagaData)
public void Update(IContainSagaData sagaData, SagaManifestCollection sagaManifests)
{
deferredActions.Add(new UpdateAction(sagaData, sagaFiles, sagaManifests));
}

public void Save(IContainSagaData sagaData)
public void Save(IContainSagaData sagaData, SagaManifestCollection sagaManifests)
{
deferredActions.Add(new SaveAction(sagaData, sagaFiles, sagaManifests));
}

public void Complete(IContainSagaData sagaData)
public void Complete(IContainSagaData sagaData, SagaManifestCollection sagaManifests)
{
deferredActions.Add(new CompleteAction(sagaData, sagaFiles, sagaManifests));
}

async Task<SagaStorageFile> Open(Guid sagaId, Type entityType, CancellationToken cancellationToken)
async Task<SagaStorageFile> Open(Guid sagaId, Type entityType, SagaManifestCollection sagaManifests, CancellationToken cancellationToken)
{
var sagaManifest = sagaManifests.GetForEntityType(entityType);

Expand All @@ -90,8 +85,6 @@ async Task<SagaStorageFile> Open(Guid sagaId, Type entityType, CancellationToken
return sagaStorageFile;
}

SagaManifestCollection sagaManifests;

Dictionary<string, SagaStorageFile> sagaFiles = new Dictionary<string, SagaStorageFile>();

List<StorageAction> deferredActions = new List<StorageAction>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.IO;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Sagas;
using Persistence;

class LearningSagaPersistence : Feature
{
Expand All @@ -22,7 +21,6 @@ protected internal override void Setup(FeatureConfigurationContext context)
var allSagas = context.Settings.Get<SagaMetadataCollection>();

context.Services.AddSingleton(new SagaManifestCollection(allSagas, storageLocation, sagaName => sagaName.Replace("+", "")));
context.Services.AddScoped<ICompletableSynchronizedStorageSession, LearningSynchronizedStorageSession>();
context.Services.AddSingleton<ISagaPersister, LearningSagaPersister>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,47 @@ namespace NServiceBus

class LearningSagaPersister : ISagaPersister
{
public LearningSagaPersister(SagaManifestCollection sagaManifests) => this.sagaManifests = sagaManifests;

public Task Save(IContainSagaData sagaData, SagaCorrelationProperty correlationProperty, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken = default)
{
var storageSession = (LearningSynchronizedStorageSession)session;
storageSession.Save(sagaData);
storageSession.Save(sagaData, sagaManifests);
return Task.CompletedTask;
}

public Task Update(IContainSagaData sagaData, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken = default)
{
var storageSession = (LearningSynchronizedStorageSession)session;
storageSession.Update(sagaData);
storageSession.Update(sagaData, sagaManifests);
return Task.CompletedTask;
}

public Task<TSagaData> Get<TSagaData>(Guid sagaId, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken = default)
where TSagaData : class, IContainSagaData
{
return Get<TSagaData>(sagaId, session, cancellationToken);
return Get<TSagaData>(sagaId, session, sagaManifests, cancellationToken);
}

public Task<TSagaData> Get<TSagaData>(string propertyName, object propertyValue, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken = default)
where TSagaData : class, IContainSagaData
{
return Get<TSagaData>(LearningSagaIdGenerator.Generate(typeof(TSagaData), propertyName, propertyValue), session, cancellationToken);
return Get<TSagaData>(LearningSagaIdGenerator.Generate(typeof(TSagaData), propertyName, propertyValue), session, sagaManifests, cancellationToken);
}

public Task Complete(IContainSagaData sagaData, ISynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken = default)
{
var storageSession = (LearningSynchronizedStorageSession)session;
storageSession.Complete(sagaData);
storageSession.Complete(sagaData, sagaManifests);
return Task.CompletedTask;
}

static Task<TSagaData> Get<TSagaData>(Guid sagaId, ISynchronizedStorageSession session, CancellationToken cancellationToken) where TSagaData : class, IContainSagaData
static Task<TSagaData> Get<TSagaData>(Guid sagaId, ISynchronizedStorageSession session, SagaManifestCollection sagaManifests, CancellationToken cancellationToken) where TSagaData : class, IContainSagaData
{
var storageSession = (LearningSynchronizedStorageSession)session;
return storageSession.Read<TSagaData>(sagaId, cancellationToken);
return storageSession.Read<TSagaData>(sagaId, sagaManifests, cancellationToken);
}

readonly SagaManifestCollection sagaManifests;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public interface ICompletableSynchronizedStorageSession : ISynchronizedStorageSe
/// <summary>
/// Opens the storage session.
/// </summary>
/// <param name="contextBag">The context information.</param>
/// <param name="context">The context information.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
Task Open(ContextBag contextBag, CancellationToken cancellationToken = default);
Task Open(ContextBag context, CancellationToken cancellationToken = default);

/// <summary>
/// Completes the session by saving the changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ValueTask<bool> TryOpen(TransportTransaction transportTransaction, Contex
CancellationToken cancellationToken = default) =>
new ValueTask<bool>(false);

public Task Open(ContextBag contextBag, CancellationToken cancellationToken = default) =>
public Task Open(ContextBag context, CancellationToken cancellationToken = default) =>
Task.CompletedTask;

public Task CompleteAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
----------- Public registrations used by Core -----------
NServiceBus.Persistence.ICompletableSynchronizedStorageSession - Scoped
NServiceBus.ReceiveAddresses - Singleton
NServiceBus.Transport.IMessageDispatcher - Singleton
NServiceBus.Transport.ISubscriptionManager - Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ public partial class PersistenceTestsConfiguration
public Task Configure(CancellationToken cancellationToken = default)
{
SagaIdGenerator = new LearningSagaIdGenerator();
SagaStorage = new LearningSagaPersister();

var sagaManifests = new SagaManifestCollection(SagaMetadataCollection,
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, ".sagas"),
name => DeterministicGuid.Create(name).ToString());

CreateStorageSession = () => new LearningSynchronizedStorageSession(sagaManifests);
SagaStorage = new LearningSagaPersister(sagaManifests);

CreateStorageSession = () => new LearningSynchronizedStorageSession();

return Task.CompletedTask;
}
Expand Down

0 comments on commit bc839e8

Please sign in to comment.