Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the session adapter class public #6526

Merged
merged 2 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ NServiceBus.InferredMessageTypeEnricherBehavior - InstancePerCall
NServiceBus.Transport.IDispatchMessages - SingleInstance
NServiceBus.Unicast.MessageHandlerRegistry - SingleInstance
----------- Registrations not used by the core, can be removed in next major if downstreams have been confirmed to not use it -----------
NServiceBus.CompletableSynchronizedStorageSessionAdapter - InstancePerUnitOfWork
NServiceBus.CriticalError - SingleInstance
NServiceBus.Hosting.HostInformation - SingleInstance
NServiceBus.MessageInterfaces.IMessageMapper - SingleInstance
NServiceBus.NoOpCanceling - SingleInstance
NServiceBus.Notifications - SingleInstance
NServiceBus.ObjectBuilder.IBuilder - InstancePerCall
NServiceBus.Persistence.CompletableSynchronizedStorageSession - InstancePerUnitOfWork
NServiceBus.Persistence.SynchronizedStorageSession - InstancePerUnitOfWork
NServiceBus.Pipeline.LogicalMessageFactory - SingleInstance
NServiceBus.Settings.ReadOnlySettings - SingleInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,20 @@ protected override async Task OnStart(IMessageSession session)
{
using (var childBuilder = provider.CreateChildBuilder())
using (var completableSynchronizedStorageSession =
childBuilder.Build<CompletableSynchronizedStorageSession>())
childBuilder.Build<CompletableSynchronizedStorageSessionAdapter>())
{
scenarioContext.AdaptedSessionIsNullBeforeOpening =
completableSynchronizedStorageSession.GetAdaptedSession() == null;
completableSynchronizedStorageSession.AdaptedSession == null;

await completableSynchronizedStorageSession.Open(new ContextBag());

scenarioContext.AdaptedSessionNotNullAfterOpening =
completableSynchronizedStorageSession.GetAdaptedSession() != null;
completableSynchronizedStorageSession.AdaptedSession != null;

var synchronizedStorage = childBuilder.Build<SynchronizedStorageSession>();

scenarioContext.StorageSessionEqual =
completableSynchronizedStorageSession.GetAdaptedSession()
.Equals(synchronizedStorage);
completableSynchronizedStorageSession.AdaptedSession == synchronizedStorage;

await completableSynchronizedStorageSession.CompleteAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ namespace NServiceBus
public static void DoNotEnforceBestPractices(this NServiceBus.Pipeline.IUnsubscribeContext context) { }
public static bool IgnoredBestPractices(this NServiceBus.Extensibility.ExtendableOptions options) { }
}
public sealed class CompletableSynchronizedStorageSessionAdapter : System.IDisposable
{
public CompletableSynchronizedStorageSessionAdapter(NServiceBus.Persistence.ISynchronizedStorageAdapter synchronizedStorageAdapter, NServiceBus.Persistence.ISynchronizedStorage synchronizedStorage) { }
public NServiceBus.Persistence.CompletableSynchronizedStorageSession AdaptedSession { get; }
public System.Threading.Tasks.Task CompleteAsync() { }
public void Dispose() { }
public System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag contextBag) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
}
public class static ConfigurationTimeoutExtensions
{
public static void TimeToWaitBeforeTriggeringCriticalErrorOnTimeoutOutages(this NServiceBus.EndpointConfiguration config, System.TimeSpan timeToWait) { }
Expand Down Expand Up @@ -2040,12 +2050,8 @@ namespace NServiceBus.Persistence
}
public class static CompletableSynchronizedStorageSessionExtensions
{
public static NServiceBus.Persistence.CompletableSynchronizedStorageSession GetAdaptedSession(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
}
public interface ISynchronizedStorage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ namespace NServiceBus
public static void DoNotEnforceBestPractices(this NServiceBus.Pipeline.IUnsubscribeContext context) { }
public static bool IgnoredBestPractices(this NServiceBus.Extensibility.ExtendableOptions options) { }
}
public sealed class CompletableSynchronizedStorageSessionAdapter : System.IDisposable
{
public CompletableSynchronizedStorageSessionAdapter(NServiceBus.Persistence.ISynchronizedStorageAdapter synchronizedStorageAdapter, NServiceBus.Persistence.ISynchronizedStorage synchronizedStorage) { }
public NServiceBus.Persistence.CompletableSynchronizedStorageSession AdaptedSession { get; }
public System.Threading.Tasks.Task CompleteAsync() { }
public void Dispose() { }
public System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag contextBag) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
}
public class static ConfigurationTimeoutExtensions
{
public static void TimeToWaitBeforeTriggeringCriticalErrorOnTimeoutOutages(this NServiceBus.EndpointConfiguration config, System.TimeSpan timeToWait) { }
Expand Down Expand Up @@ -2042,12 +2052,8 @@ namespace NServiceBus.Persistence
}
public class static CompletableSynchronizedStorageSessionExtensions
{
public static NServiceBus.Persistence.CompletableSynchronizedStorageSession GetAdaptedSession(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
}
public interface ISynchronizedStorage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public LoadHandlersConnector(MessageHandlerRegistry messageHandlerRegistry) =>
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<IInvokeHandlerContext, Task> stage)
{
// registered in UnitOfWork scope, therefore can't be resolved at connector construction time
using (var storageSessionAdapter = context.Builder.Build<CompletableSynchronizedStorageSession>())
using (var storageSessionAdapter = context.Builder.Build<CompletableSynchronizedStorageSessionAdapter>())
{
await storageSessionAdapter.Open(context).ConfigureAwait(false);

Expand All @@ -38,7 +38,7 @@ public override async Task Invoke(IIncomingLogicalMessageContext context, Func<I
{
messageHandler.Instance = context.Builder.Build(messageHandler.HandlerType);

var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSessionAdapter.GetAdaptedSession(), context);
var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSessionAdapter.AdaptedSession, context);
await stage(handlingContext).ConfigureAwait(false);

if (handlingContext.HandlerInvocationAborted)
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core/Receiving/ReceiveComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static ReceiveComponent Initialize(
return new TransportReceiveToPhysicalMessageConnector(storage);
}, "Allows to abort processing the message");

hostingConfiguration.Container.ConfigureComponent<CompletableSynchronizedStorageSession>(b =>
hostingConfiguration.Container.ConfigureComponent(b =>
{
var adapter = hostingConfiguration.Container.HasComponent<ISynchronizedStorageAdapter>() ? b.Build<ISynchronizedStorageAdapter>() : new NoOpSynchronizedStorageAdapter();
var syncStorage = hostingConfiguration.Container.HasComponent<ISynchronizedStorage>() ? b.Build<ISynchronizedStorage>() : new NoOpSynchronizedStorage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
namespace NServiceBus
{
using System;
using System.Threading.Tasks;
using Extensibility;
using Janitor;
using Outbox;
using Persistence;
using Transport;

/// <summary>
/// Wraps the logic to open a <see cref="CompletableSynchronizedStorageSession"/> via <see cref="ISynchronizedStorageAdapter"/> and <see cref="ISynchronizedStorage"/>.
/// </summary>
[SkipWeaving]
sealed class CompletableSynchronizedStorageSessionAdapter : CompletableSynchronizedStorageSession
public sealed class CompletableSynchronizedStorageSessionAdapter : IDisposable
{
/// <summary>
/// Creates a new instance of <see cref="CompletableSynchronizedStorageSessionAdapter"/>.
/// </summary>
public CompletableSynchronizedStorageSessionAdapter(ISynchronizedStorageAdapter synchronizedStorageAdapter, ISynchronizedStorage synchronizedStorage)
{
this.synchronizedStorage = synchronizedStorage;
this.synchronizedStorageAdapter = synchronizedStorageAdapter;
}

/// <summary>
/// Access to the <see cref="CompletableSynchronizedStorageSession"/> once it has been opened.
/// </summary>
public CompletableSynchronizedStorageSession AdaptedSession { get; private set; }

/// <summary>
/// Disposes the adapter and the underlying <see cref="CompletableSynchronizedStorageSession"/>.
/// </summary>
public void Dispose()
{
if (disposed)
Expand All @@ -28,22 +42,34 @@ public void Dispose()
disposed = true;
}

/// <summary>
/// Completes the underlying <see cref="CompletableSynchronizedStorageSession"/>.
/// </summary>
public Task CompleteAsync() => AdaptedSession.CompleteAsync();

/// <summary>
/// Tries to open a <see cref="CompletableSynchronizedStorageSession"/> based on a given <see cref="OutboxTransaction"/>.
/// </summary>
public async Task<bool> TryOpen(OutboxTransaction transaction, ContextBag context)
{
AdaptedSession = await synchronizedStorageAdapter.TryAdapt(transaction, context).ConfigureAwait(false);

return AdaptedSession != null;
}

/// <summary>
/// Tries to open a <see cref="CompletableSynchronizedStorageSession"/> based on a given <see cref="TransportTransaction"/>.
/// </summary>
public async Task<bool> TryOpen(TransportTransaction transportTransaction, ContextBag context)
{
AdaptedSession = await synchronizedStorageAdapter.TryAdapt(transportTransaction, context).ConfigureAwait(false);

return AdaptedSession != null;
}

/// <summary>
/// Opens a <see cref="CompletableSynchronizedStorageSession"/> via the persister that is not connected to an outbox or transport transaction.
/// </summary>
public async Task Open(ContextBag contextBag) => AdaptedSession = await synchronizedStorage.OpenSession(contextBag).ConfigureAwait(false);

bool disposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,38 @@ public static class CompletableSynchronizedStorageSessionExtensions
/// <summary>
/// Opens the storage session by attempting to extract the outbox and transport transaction from the incoming logical context.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="sessionAdapter">The storage session.</param>
/// <param name="context">The context information.</param>
public static Task Open(this CompletableSynchronizedStorageSession session,
public static Task Open(this CompletableSynchronizedStorageSessionAdapter sessionAdapter,
IIncomingLogicalMessageContext context)
{
var outboxTransaction = context.Extensions.Get<OutboxTransaction>();
var transportTransaction = context.Extensions.Get<TransportTransaction>();
return session.Open(outboxTransaction, transportTransaction, context.Extensions);
return sessionAdapter.Open(outboxTransaction, transportTransaction, context.Extensions);
}

/// <summary>
/// Opens the storage session based on the outbox or the transport transaction.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="sessionAdapter">The storage session.</param>
/// <param name="outboxTransaction">The outbox transaction.</param>
/// <param name="transportTransaction">The transport transaction.</param>
/// <param name="contextBag">The context bag.</param>
public static async Task Open(this CompletableSynchronizedStorageSession session,
public static async Task Open(this CompletableSynchronizedStorageSessionAdapter sessionAdapter,
OutboxTransaction outboxTransaction, TransportTransaction transportTransaction,
ContextBag contextBag)
{
if (await session.TryOpen(outboxTransaction, contextBag).ConfigureAwait(false))
if (await sessionAdapter.TryOpen(outboxTransaction, contextBag).ConfigureAwait(false))
{
return;
}

if (await session.TryOpen(transportTransaction, contextBag).ConfigureAwait(false))
if (await sessionAdapter.TryOpen(transportTransaction, contextBag).ConfigureAwait(false))
{
return;
}

await session.Open(contextBag).ConfigureAwait(false);
await sessionAdapter.Open(contextBag).ConfigureAwait(false);
}

/// <summary>
/// Tries to open the storage session with the provided outbox transaction.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="transaction">Outbox transaction.</param>
/// <param name="context">Context.</param>
/// <returns><c>true</c> when the session was opened; otherwise <c>false</c>.</returns>
public static Task<bool> TryOpen(this CompletableSynchronizedStorageSession session,
OutboxTransaction transaction, ContextBag context) =>
((CompletableSynchronizedStorageSessionAdapter)session).TryOpen(transaction, context);

/// <summary>
/// Tries to open the storage session with the provided transport transaction.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="transportTransaction">Transport transaction.</param>
/// <param name="context">Context.</param>
/// <returns><c>true</c> when the session was opened; otherwise <c>false</c>.</returns>
public static Task<bool> TryOpen(this CompletableSynchronizedStorageSession session,
TransportTransaction transportTransaction, ContextBag context) =>
((CompletableSynchronizedStorageSessionAdapter)session).TryOpen(transportTransaction, context);

/// <summary>
/// Opens the storage session.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="context">The context information.</param>
public static Task Open(this CompletableSynchronizedStorageSession session, ContextBag context) =>
((CompletableSynchronizedStorageSessionAdapter)session).Open(context);

/// <summary>
/// Returns the underlying, persistence specific, storage session.
/// </summary>
/// <param name="session">The storage session.</param>
public static CompletableSynchronizedStorageSession GetAdaptedSession(this CompletableSynchronizedStorageSession session) =>
((CompletableSynchronizedStorageSessionAdapter)session).AdaptedSession;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timbussmann We discussed this during this morning's sync. I agree this looks very confusing, but keeping v7 and v8 aligned should be a priority, especially due to how we work and how much context is lost around these changes.

Would it be helpful if we modify this as proposed in #6529 ?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal SynchronizedStorage()
protected internal override void Setup(FeatureConfigurationContext context)
{
context.Container.ConfigureComponent<SynchronizedStorageSession>(
builder => builder.Build<CompletableSynchronizedStorageSession>().GetAdaptedSession(),
builder => builder.Build<CompletableSynchronizedStorageSessionAdapter>().AdaptedSession,
DependencyLifecycle.InstancePerUnitOfWork);
}
}
Expand Down
Loading