Skip to content

Commit

Permalink
make the session adapter class public
Browse files Browse the repository at this point in the history
  • Loading branch information
timbussmann authored and danielmarbach committed Sep 1, 2022
1 parent ddf8e7b commit 1e12126
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,20 @@ protected override async Task OnStart(IMessageSession session)
{
using (var childBuilder = provider.CreateChildBuilder())
using (var completableSynchronizedStorageSession =
childBuilder.Build<CompletableSynchronizedStorageSession>())
childBuilder.Build<CompletableSynchronizedStorageSessionAdapter>())
{
scenarioContext.AdaptedSessionIsNullBeforeOpening =
completableSynchronizedStorageSession.GetAdaptedSession() == null;
completableSynchronizedStorageSession.AdaptedSession == null;

await completableSynchronizedStorageSession.Open(new ContextBag());

scenarioContext.AdaptedSessionNotNullAfterOpening =
completableSynchronizedStorageSession.GetAdaptedSession() != null;
completableSynchronizedStorageSession.AdaptedSession != null;

var synchronizedStorage = childBuilder.Build<SynchronizedStorageSession>();

scenarioContext.StorageSessionEqual =
completableSynchronizedStorageSession.GetAdaptedSession()
.Equals(synchronizedStorage);
completableSynchronizedStorageSession.AdaptedSession == synchronizedStorage;

await completableSynchronizedStorageSession.CompleteAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ namespace NServiceBus
public static void DoNotEnforceBestPractices(this NServiceBus.Pipeline.IUnsubscribeContext context) { }
public static bool IgnoredBestPractices(this NServiceBus.Extensibility.ExtendableOptions options) { }
}
public sealed class CompletableSynchronizedStorageSessionAdapter : System.IDisposable
{
public CompletableSynchronizedStorageSessionAdapter(NServiceBus.Persistence.ISynchronizedStorageAdapter synchronizedStorageAdapter, NServiceBus.Persistence.ISynchronizedStorage synchronizedStorage) { }
public NServiceBus.Persistence.CompletableSynchronizedStorageSession AdaptedSession { get; }
public System.Threading.Tasks.Task CompleteAsync() { }
public void Dispose() { }
public System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag contextBag) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
}
public class static ConfigurationTimeoutExtensions
{
public static void TimeToWaitBeforeTriggeringCriticalErrorOnTimeoutOutages(this NServiceBus.EndpointConfiguration config, System.TimeSpan timeToWait) { }
Expand Down Expand Up @@ -2040,12 +2050,8 @@ namespace NServiceBus.Persistence
}
public class static CompletableSynchronizedStorageSessionExtensions
{
public static NServiceBus.Persistence.CompletableSynchronizedStorageSession GetAdaptedSession(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
}
public interface ISynchronizedStorage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ namespace NServiceBus
public static void DoNotEnforceBestPractices(this NServiceBus.Pipeline.IUnsubscribeContext context) { }
public static bool IgnoredBestPractices(this NServiceBus.Extensibility.ExtendableOptions options) { }
}
public sealed class CompletableSynchronizedStorageSessionAdapter : System.IDisposable
{
public CompletableSynchronizedStorageSessionAdapter(NServiceBus.Persistence.ISynchronizedStorageAdapter synchronizedStorageAdapter, NServiceBus.Persistence.ISynchronizedStorage synchronizedStorage) { }
public NServiceBus.Persistence.CompletableSynchronizedStorageSession AdaptedSession { get; }
public System.Threading.Tasks.Task CompleteAsync() { }
public void Dispose() { }
public System.Threading.Tasks.Task Open(NServiceBus.Extensibility.ContextBag contextBag) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public System.Threading.Tasks.Task<bool> TryOpen(NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
}
public class static ConfigurationTimeoutExtensions
{
public static void TimeToWaitBeforeTriggeringCriticalErrorOnTimeoutOutages(this NServiceBus.EndpointConfiguration config, System.TimeSpan timeToWait) { }
Expand Down Expand Up @@ -2042,12 +2052,8 @@ namespace NServiceBus.Persistence
}
public class static CompletableSynchronizedStorageSessionExtensions
{
public static NServiceBus.Persistence.CompletableSynchronizedStorageSession GetAdaptedSession(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Outbox.OutboxTransaction transaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task<bool> TryOpen(this NServiceBus.Persistence.CompletableSynchronizedStorageSession session, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Pipeline.IIncomingLogicalMessageContext context) { }
public static System.Threading.Tasks.Task Open(this NServiceBus.CompletableSynchronizedStorageSessionAdapter sessionAdapter, NServiceBus.Outbox.OutboxTransaction outboxTransaction, NServiceBus.Transport.TransportTransaction transportTransaction, NServiceBus.Extensibility.ContextBag contextBag) { }
}
public interface ISynchronizedStorage
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public LoadHandlersConnector(MessageHandlerRegistry messageHandlerRegistry) =>
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<IInvokeHandlerContext, Task> stage)
{
// registered in UnitOfWork scope, therefore can't be resolved at connector construction time
using (var storageSessionAdapter = context.Builder.Build<CompletableSynchronizedStorageSession>())
using (var storageSessionAdapter = context.Builder.Build<CompletableSynchronizedStorageSessionAdapter>())
{
await storageSessionAdapter.Open(context).ConfigureAwait(false);

Expand All @@ -38,7 +38,7 @@ public override async Task Invoke(IIncomingLogicalMessageContext context, Func<I
{
messageHandler.Instance = context.Builder.Build(messageHandler.HandlerType);

var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSessionAdapter.GetAdaptedSession(), context);
var handlingContext = this.CreateInvokeHandlerContext(messageHandler, storageSessionAdapter.AdaptedSession, context);
await stage(handlingContext).ConfigureAwait(false);

if (handlingContext.HandlerInvocationAborted)
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core/Receiving/ReceiveComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static ReceiveComponent Initialize(
return new TransportReceiveToPhysicalMessageConnector(storage);
}, "Allows to abort processing the message");

hostingConfiguration.Container.ConfigureComponent<CompletableSynchronizedStorageSession>(b =>
hostingConfiguration.Container.ConfigureComponent(b =>
{
var adapter = hostingConfiguration.Container.HasComponent<ISynchronizedStorageAdapter>() ? b.Build<ISynchronizedStorageAdapter>() : new NoOpSynchronizedStorageAdapter();
var syncStorage = hostingConfiguration.Container.HasComponent<ISynchronizedStorage>() ? b.Build<ISynchronizedStorage>() : new NoOpSynchronizedStorage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
namespace NServiceBus
{
using System;
using System.Threading.Tasks;
using Extensibility;
using Janitor;
using Outbox;
using Persistence;
using Transport;

/// <summary>
/// Wraps the logic to open a <see cref="CompletableSynchronizedStorageSession"/> via <see cref="ISynchronizedStorageAdapter"/> and <see cref="ISynchronizedStorage"/>.
/// </summary>
[SkipWeaving]
sealed class CompletableSynchronizedStorageSessionAdapter : CompletableSynchronizedStorageSession
public sealed class CompletableSynchronizedStorageSessionAdapter : IDisposable
{
/// <summary>
/// Creates a new instance of <see cref="CompletableSynchronizedStorageSessionAdapter"/>.
/// </summary>
public CompletableSynchronizedStorageSessionAdapter(ISynchronizedStorageAdapter synchronizedStorageAdapter, ISynchronizedStorage synchronizedStorage)
{
this.synchronizedStorage = synchronizedStorage;
this.synchronizedStorageAdapter = synchronizedStorageAdapter;
}

/// <summary>
/// Access to the <see cref="CompletableSynchronizedStorageSession"/> once it has been opened.
/// </summary>
public CompletableSynchronizedStorageSession AdaptedSession { get; private set; }

/// <summary>
/// Disposes the adapter and the underlying <see cref="CompletableSynchronizedStorageSession"/>.
/// </summary>
public void Dispose()
{
if (disposed)
Expand All @@ -28,22 +42,34 @@ public void Dispose()
disposed = true;
}

/// <summary>
/// Completes the underlying <see cref="CompletableSynchronizedStorageSession"/>.
/// </summary>
public Task CompleteAsync() => AdaptedSession.CompleteAsync();

/// <summary>
/// Tries to open a <see cref="CompletableSynchronizedStorageSession"/> based on a given <see cref="OutboxTransaction"/>.
/// </summary>
public async Task<bool> TryOpen(OutboxTransaction transaction, ContextBag context)
{
AdaptedSession = await synchronizedStorageAdapter.TryAdapt(transaction, context).ConfigureAwait(false);

return AdaptedSession != null;
}

/// <summary>
/// Tries to open a <see cref="CompletableSynchronizedStorageSession"/> based on a given <see cref="TransportTransaction"/>.
/// </summary>
public async Task<bool> TryOpen(TransportTransaction transportTransaction, ContextBag context)
{
AdaptedSession = await synchronizedStorageAdapter.TryAdapt(transportTransaction, context).ConfigureAwait(false);

return AdaptedSession != null;
}

/// <summary>
/// Opens a <see cref="CompletableSynchronizedStorageSession"/> via the persister that is not connected to an outbox or transport transaction.
/// </summary>
public async Task Open(ContextBag contextBag) => AdaptedSession = await synchronizedStorage.OpenSession(contextBag).ConfigureAwait(false);

bool disposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,38 @@ public static class CompletableSynchronizedStorageSessionExtensions
/// <summary>
/// Opens the storage session by attempting to extract the outbox and transport transaction from the incoming logical context.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="sessionAdapter">The storage session.</param>
/// <param name="context">The context information.</param>
public static Task Open(this CompletableSynchronizedStorageSession session,
public static Task Open(this CompletableSynchronizedStorageSessionAdapter sessionAdapter,
IIncomingLogicalMessageContext context)
{
var outboxTransaction = context.Extensions.Get<OutboxTransaction>();
var transportTransaction = context.Extensions.Get<TransportTransaction>();
return session.Open(outboxTransaction, transportTransaction, context.Extensions);
return sessionAdapter.Open(outboxTransaction, transportTransaction, context.Extensions);
}

/// <summary>
/// Opens the storage session based on the outbox or the transport transaction.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="sessionAdapter">The storage session.</param>
/// <param name="outboxTransaction">The outbox transaction.</param>
/// <param name="transportTransaction">The transport transaction.</param>
/// <param name="contextBag">The context bag.</param>
public static async Task Open(this CompletableSynchronizedStorageSession session,
public static async Task Open(this CompletableSynchronizedStorageSessionAdapter sessionAdapter,
OutboxTransaction outboxTransaction, TransportTransaction transportTransaction,
ContextBag contextBag)
{
if (await session.TryOpen(outboxTransaction, contextBag).ConfigureAwait(false))
if (await sessionAdapter.TryOpen(outboxTransaction, contextBag).ConfigureAwait(false))
{
return;
}

if (await session.TryOpen(transportTransaction, contextBag).ConfigureAwait(false))
if (await sessionAdapter.TryOpen(transportTransaction, contextBag).ConfigureAwait(false))
{
return;
}

await session.Open(contextBag).ConfigureAwait(false);
await sessionAdapter.Open(contextBag).ConfigureAwait(false);
}

/// <summary>
/// Tries to open the storage session with the provided outbox transaction.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="transaction">Outbox transaction.</param>
/// <param name="context">Context.</param>
/// <returns><c>true</c> when the session was opened; otherwise <c>false</c>.</returns>
public static Task<bool> TryOpen(this CompletableSynchronizedStorageSession session,
OutboxTransaction transaction, ContextBag context) =>
((CompletableSynchronizedStorageSessionAdapter)session).TryOpen(transaction, context);

/// <summary>
/// Tries to open the storage session with the provided transport transaction.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="transportTransaction">Transport transaction.</param>
/// <param name="context">Context.</param>
/// <returns><c>true</c> when the session was opened; otherwise <c>false</c>.</returns>
public static Task<bool> TryOpen(this CompletableSynchronizedStorageSession session,
TransportTransaction transportTransaction, ContextBag context) =>
((CompletableSynchronizedStorageSessionAdapter)session).TryOpen(transportTransaction, context);

/// <summary>
/// Opens the storage session.
/// </summary>
/// <param name="session">The storage session.</param>
/// <param name="context">The context information.</param>
public static Task Open(this CompletableSynchronizedStorageSession session, ContextBag context) =>
((CompletableSynchronizedStorageSessionAdapter)session).Open(context);

/// <summary>
/// Returns the underlying, persistence specific, storage session.
/// </summary>
/// <param name="session">The storage session.</param>
public static CompletableSynchronizedStorageSession GetAdaptedSession(this CompletableSynchronizedStorageSession session) =>
((CompletableSynchronizedStorageSessionAdapter)session).AdaptedSession;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal SynchronizedStorage()
protected internal override void Setup(FeatureConfigurationContext context)
{
context.Container.ConfigureComponent<SynchronizedStorageSession>(
builder => builder.Build<CompletableSynchronizedStorageSession>().GetAdaptedSession(),
builder => builder.Build<CompletableSynchronizedStorageSessionAdapter>().AdaptedSession,
DependencyLifecycle.InstancePerUnitOfWork);
}
}
Expand Down

0 comments on commit 1e12126

Please sign in to comment.