Skip to content

Commit

Permalink
Add ability to open storage session outside the pipeline (#6467)
Browse files Browse the repository at this point in the history
* Introduce SynchronizedStorage feature
Bring in feature dependency
Approve API

* Adapter to bridge the old and the new world

* Make it possible to retrieve the storage session via DI and also open the completable one from outside the core

* Add acceptance test that verifies session can be opened outside the pipeline

* Additional checks

* Shorter endpoint name

* Reregister the CompletableSynchronizedStorageSession properly so that we don't override

Co-authored-by: Tomasz Masternak <tomasz.masternak@particular.net>

* Fix synchronized storage retrival

* Update src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs

Co-authored-by: Tim Bussmann <timbussmann@users.noreply.github.com>

* make GetAdaptedSession extension public

* Session disposal (#6525)

* resolve storage session per call

* only dispose session once

* approve scope change in test

* change registration back to same scope as the referenced type

* Use GetAdaptedSession consistently

* Make the session adapter class public (#6526)

* make the session adapter class public

* Approve

Co-authored-by: Daniel Marbach <daniel.marbach@openplace.net>

* Move CompletableSynchronizedStorageSessionAdapter into the persistence namespace

* Rename paramater

* aligning names

Co-authored-by: Tomasz Masternak <tomasz.masternak@particular.net>
Co-authored-by: Tim Bussmann <timbussmann@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 9, 2022
1 parent 6b37622 commit ede5f8f
Show file tree
Hide file tree
Showing 23 changed files with 454 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ NServiceBus.MessageInterfaces.IMessageMapper - SingleInstance
NServiceBus.NoOpCanceling - SingleInstance
NServiceBus.Notifications - SingleInstance
NServiceBus.ObjectBuilder.IBuilder - InstancePerCall
NServiceBus.Persistence.CompletableSynchronizedStorageSessionAdapter - InstancePerUnitOfWork
NServiceBus.Persistence.SynchronizedStorageSession - InstancePerUnitOfWork
NServiceBus.Pipeline.LogicalMessageFactory - SingleInstance
NServiceBus.Settings.ReadOnlySettings - SingleInstance
NServiceBus.Unicast.Messages.MessageMetadataRegistry - SingleInstance
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
using Unicast.Subscriptions;
using Unicast.Subscriptions.MessageDrivenSubscriptions;

public class When_a_persistence_does_not_provide_ISynchronizationContext : NServiceBusAcceptanceTest
public class When_a_persistence_does_not_provide_synchronized_storage_session : NServiceBusAcceptanceTest
{
// Run this test twice to ensure that the NoOpCompletableSynchronizedStorageSession's IDisposable method
// is not altered by Fody to throw an ObjectDisposedException if it was disposed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace NServiceBus.AcceptanceTests.Core.Persistence
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.Persistence;
using NUnit.Framework;

public class When_a_persistence_provides_synchronized_session : NServiceBusAcceptanceTest
{
[Test]
public async Task Synchronized_session_should_be_of_exact_type_provided_by_persistence()
{
var result = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(e => e.When(b => b.SendLocal(new MyMessage())))
.Done(c => c.MessageReceived)
.Run();

Assert.IsNotNull(result.SynchronizedStorageSessionInstanceInContainer);
Assert.IsNotNull(result.SynchronizedStorageSessionInstanceInHandlingContext);
Assert.AreSame(result.SynchronizedStorageSessionInstanceInContainer, result.SynchronizedStorageSessionInstanceInHandlingContext);
}

class Context : ScenarioContext
{
public SynchronizedStorageSession SynchronizedStorageSessionInstanceInContainer { get; set; }
public SynchronizedStorageSession SynchronizedStorageSessionInstanceInHandlingContext { get; set; }
public bool MessageReceived { get; set; }
}

class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() => EndpointSetup<DefaultServer>();

class MyMessageHandler : IHandleMessages<MyMessage>
{
public MyMessageHandler(Context testContext, SynchronizedStorageSession storageSession)
{
this.testContext = testContext;
testContext.SynchronizedStorageSessionInstanceInContainer = storageSession;
}

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
testContext.SynchronizedStorageSessionInstanceInHandlingContext = context.SynchronizedStorageSession;
testContext.MessageReceived = true;
return Task.FromResult(0);
}

readonly Context testContext;
}
}

public class MyMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
namespace NServiceBus.AcceptanceTests.Reliability.SynchronizedStorage
{
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Extensibility;
using Features;
using NUnit.Framework;
using ObjectBuilder;
using Persistence;

public class When_opening_storage_session_outside_pipeline : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_provide_adapted_session_with_same_scope()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.Done(c => c.Done)
.Run();

Assert.True(context.AdaptedSessionIsNullBeforeOpening, "The adapted session was not null before opening the session.");
Assert.True(context.AdaptedSessionNotNullAfterOpening, "The adapted session was null after opening the session.");
Assert.True(context.StorageSessionEqual, "The scoped storage session should be equal.");
}

public class Context : ScenarioContext
{
public bool StorageSessionEqual { get; set; }
public bool AdaptedSessionIsNullBeforeOpening { get; set; }
public bool AdaptedSessionNotNullAfterOpening { get; set; }
public bool Done { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<Bootstrapper>();
});
}

public class Bootstrapper : Feature
{
public Bootstrapper() => EnableByDefault();

protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(b => new MyTask(b.Build<Context>(), b));
}

public class MyTask : FeatureStartupTask
{
public MyTask(Context scenarioContext, IBuilder provider)
{
this.provider = provider;
this.scenarioContext = scenarioContext;
}

protected override async Task OnStart(IMessageSession session)
{
using (var childBuilder = provider.CreateChildBuilder())
using (var completableSynchronizedStorageSession =
childBuilder.Build<CompletableSynchronizedStorageSessionAdapter>())
{
scenarioContext.AdaptedSessionIsNullBeforeOpening =
completableSynchronizedStorageSession.AdaptedSession == null;

await completableSynchronizedStorageSession.Open(new ContextBag());

scenarioContext.AdaptedSessionNotNullAfterOpening =
completableSynchronizedStorageSession.AdaptedSession != null;

var synchronizedStorage = childBuilder.Build<SynchronizedStorageSession>();

scenarioContext.StorageSessionEqual =
completableSynchronizedStorageSession.AdaptedSession == synchronizedStorage;

await completableSynchronizedStorageSession.CompleteAsync();
}

scenarioContext.Done = true;
}

protected override Task OnStop(IMessageSession session) => Task.FromResult(0);

readonly Context scenarioContext;
readonly IBuilder provider;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1754,6 +1754,10 @@ namespace NServiceBus.Features
public static bool IsFeatureActive(this NServiceBus.Settings.ReadOnlySettings settings, System.Type featureType) { }
public static bool IsFeatureEnabled(this NServiceBus.Settings.ReadOnlySettings settings, System.Type featureType) { }
}
public class SynchronizedStorage : NServiceBus.Features.Feature
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
}
public class TimeoutManager : NServiceBus.Features.Feature
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
Expand Down Expand Up @@ -2034,6 +2038,21 @@ namespace NServiceBus.Persistence
{
System.Threading.Tasks.Task CompleteAsync();
}
public sealed class CompletableSynchronizedStorageSessionAdapter : System.IDisposable
{
public CompletableSynchronizedStorageSessionAdapter(NServiceBus.Persistence.ISynchronizedStorageAdapter synchronizedStorageAdapter, NServiceBus.Persistence.ISynchronizedStorage synchronizedStorage) { }
public NServiceBus.Persistence.CompletableSynchronizedStorageSession AdaptedSession { get; }
public System.Threading.Tasks.Task CompleteAsync() { }
public void Dispose() { }
public System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
}
public class static CompletableSynchronizedStorageSessionAdapterExtensions
{
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
}
public interface ISynchronizedStorage
{
System.Threading.Tasks.Task<NServiceBus.Persistence.CompletableSynchronizedStorageSession> OpenSession(NServiceBus.Extensibility.ContextBag contextBag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,10 @@ namespace NServiceBus.Features
public static bool IsFeatureActive(this NServiceBus.Settings.ReadOnlySettings settings, System.Type featureType) { }
public static bool IsFeatureEnabled(this NServiceBus.Settings.ReadOnlySettings settings, System.Type featureType) { }
}
public class SynchronizedStorage : NServiceBus.Features.Feature
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
}
public class TimeoutManager : NServiceBus.Features.Feature
{
protected override void Setup(NServiceBus.Features.FeatureConfigurationContext context) { }
Expand Down Expand Up @@ -2036,6 +2040,21 @@ namespace NServiceBus.Persistence
{
System.Threading.Tasks.Task CompleteAsync();
}
public sealed class CompletableSynchronizedStorageSessionAdapter : System.IDisposable
{
public CompletableSynchronizedStorageSessionAdapter(NServiceBus.Persistence.ISynchronizedStorageAdapter synchronizedStorageAdapter, NServiceBus.Persistence.ISynchronizedStorage synchronizedStorage) { }
public NServiceBus.Persistence.CompletableSynchronizedStorageSession AdaptedSession { get; }
public System.Threading.Tasks.Task CompleteAsync() { }
public void Dispose() { }
public System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
}
public class static CompletableSynchronizedStorageSessionAdapterExtensions
{
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
}
public interface ISynchronizedStorage
{
System.Threading.Tasks.Task<NServiceBus.Persistence.CompletableSynchronizedStorageSession> OpenSession(NServiceBus.Extensibility.ContextBag contextBag);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace NServiceBus.Core.Tests.Reliability
{
using System.Threading.Tasks;
using Extensibility;
using NServiceBus.Outbox;
using NServiceBus.Persistence;
using NUnit.Framework;
using Transport;

[TestFixture]
public class CompletableSynchronizedStorageSessionAdapterTests
{
[Test]
public async Task Should_dispose_adapted_session_only_once()
{
var storageAdapter = new FakeStorageAdapter();
var sessionAdapter = new CompletableSynchronizedStorageSessionAdapter(storageAdapter, null);
await sessionAdapter.TryOpen(new TransportTransaction(), new ContextBag());

sessionAdapter.Dispose();
sessionAdapter.Dispose();

Assert.AreEqual(1, storageAdapter.StorageSession.DisposeCounter);
}
}

public class FakeStorageAdapter : ISynchronizedStorageAdapter
{
public FakeStorageSession StorageSession { get; } = new FakeStorageSession();

public Task<CompletableSynchronizedStorageSession> TryAdapt(OutboxTransaction transaction, ContextBag context) => Task.FromResult<CompletableSynchronizedStorageSession>(StorageSession);

public Task<CompletableSynchronizedStorageSession> TryAdapt(TransportTransaction transportTransaction, ContextBag context) => Task.FromResult<CompletableSynchronizedStorageSession>(StorageSession);
}

public class FakeStorageSession : CompletableSynchronizedStorageSession
{
public int DisposeCounter { get; private set; }

public void Dispose() => DisposeCounter++;

public Task CompleteAsync() => Task.FromResult(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
using Testing;

[TestFixture]
public class LoadHandlersBehaviorTests
public class LoadHandlersConnectorTests
{
[Test]
public void Should_throw_when_there_are_no_registered_message_handlers()
{
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry(), new InMemorySynchronizedStorage(), new InMemoryTransactionalSynchronizedStorageAdapter());
var behavior = new LoadHandlersConnector(new MessageHandlerRegistry());

var context = new TestableIncomingLogicalMessageContext();

Expand Down
29 changes: 7 additions & 22 deletions src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,23 @@
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Extensibility;
using Logging;
using Outbox;
using Persistence;
using Pipeline;
using Transport;
using Unicast;

class LoadHandlersConnector : StageConnector<IIncomingLogicalMessageContext, IInvokeHandlerContext>
{
public LoadHandlersConnector(MessageHandlerRegistry messageHandlerRegistry, ISynchronizedStorage synchronizedStorage, ISynchronizedStorageAdapter adapter)
{
public LoadHandlersConnector(MessageHandlerRegistry messageHandlerRegistry) =>
this.messageHandlerRegistry = messageHandlerRegistry;
this.synchronizedStorage = synchronizedStorage;
this.adapter = adapter;
}

public override async Task Invoke(IIncomingLogicalMessageContext context, Func<IInvokeHandlerContext, Task> stage)
{
var outboxTransaction = context.Extensions.Get<OutboxTransaction>();
var transportTransaction = context.Extensions.Get<TransportTransaction>();
using (var storageSession = await AdaptOrOpenNewSynchronizedStorageSession(transportTransaction, outboxTransaction, context.Extensions).ConfigureAwait(false))
// registered in UnitOfWork scope, therefore can't be resolved at connector construction time
using (var storageSessionAdapter = context.Builder.Build<CompletableSynchronizedStorageSessionAdapter>())
{
await storageSessionAdapter.Open(context).ConfigureAwait(false);

var handlersToInvoke = messageHandlerRegistry.GetHandlersFor(context.Message.MessageType);

if (!context.MessageHandled && handlersToInvoke.Count == 0)
Expand All @@ -44,7 +38,7 @@ public override async Task Invoke(IIncomingLogicalMessageContext context, Func<I
{
messageHandler.Instance = context.Builder.Build(messageHandler.HandlerType);

var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSession, context);
var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSessionAdapter.AdaptedSession, context);
await stage(handlingContext).ConfigureAwait(false);

if (handlingContext.HandlerInvocationAborted)
Expand All @@ -54,17 +48,10 @@ public override async Task Invoke(IIncomingLogicalMessageContext context, Func<I
}
}
context.MessageHandled = true;
await storageSession.CompleteAsync().ConfigureAwait(false);
await storageSessionAdapter.CompleteAsync().ConfigureAwait(false);
}
}

async Task<CompletableSynchronizedStorageSession> AdaptOrOpenNewSynchronizedStorageSession(TransportTransaction transportTransaction, OutboxTransaction outboxTransaction, ContextBag contextBag)
{
return await adapter.TryAdapt(outboxTransaction, contextBag).ConfigureAwait(false)
?? await adapter.TryAdapt(transportTransaction, contextBag).ConfigureAwait(false)
?? await synchronizedStorage.OpenSession(contextBag).ConfigureAwait(false);
}

static void LogHandlersInvocation(IIncomingLogicalMessageContext context, List<MessageHandler> handlersToInvoke)
{
var builder = new StringBuilder($"Processing message type: {context.Message.MessageType}");
Expand All @@ -85,8 +72,6 @@ static void LogHandlersInvocation(IIncomingLogicalMessageContext context, List<M
logger.Debug(builder.ToString());
}

readonly ISynchronizedStorageAdapter adapter;
readonly ISynchronizedStorage synchronizedStorage;
readonly MessageHandlerRegistry messageHandlerRegistry;

static readonly ILog logger = LogManager.GetLogger<LoadHandlersConnector>();
Expand Down

This file was deleted.

Loading

0 comments on commit ede5f8f

Please sign in to comment.