Skip to content

Commit

Permalink
Automatically enable ImplicitDistributedTransactions on windows when …
Browse files Browse the repository at this point in the history
…transaction mode requires it (#6850)

* Addd failing test

* Automatically enable ImplicitDistributedTransactions on windows when transaction mode requires it

* Tweaks

---------

Co-authored-by: Brandon Ording <bording@gmail.com>
  • Loading branch information
andreasohlund and bording authored Sep 14, 2023
1 parent 8ca3028 commit ad78c37
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using NServiceBus.AcceptanceTests.Tx;
using Transport;

public class FakeTransport : TransportDefinition
Expand All @@ -20,6 +22,19 @@ public override Task<TransportInfrastructure> Initialize(HostSettings hostSettin
{
StartupSequence.Add($"{nameof(TransportDefinition)}.{nameof(Initialize)}");

try
{
using var scope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);

FakePromotableResourceManager.ForceDtc();
DtcIsAvailable = true;
}
catch (Exception ex)
{
DtcIsAvailable = false;
DtcCheckException = ex;
}

var infrastructure = new FakeTransportInfrastructure(StartupSequence, hostSettings, receivers, this);

infrastructure.ConfigureSendInfrastructure();
Expand Down Expand Up @@ -62,5 +77,9 @@ public void RaiseExceptionOnTransportDispose(Exception exception)
}

public Action<(QueueAddress[] receivingAddresses, string[] sendingAddresses, bool setupInfrastructure)> OnTransportInitialize { get; set; } = _ => { };

public bool DtcIsAvailable { get; set; }

public Exception DtcCheckException { get; private set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace NServiceBus.AcceptanceTests.Core.UnitOfWork.TransactionScope
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using FakeTransport;
using NUnit.Framework;

public class When_transport_supports_transaction_scope_on_windows : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_support_dtc()
{
if (!OperatingSystem.IsWindows())
{
Assert.Ignore("Only relevant on windows");
}

var fakeTransport = new FakeTransport
{
TransportTransactionMode = TransportTransactionMode.TransactionScope
};

await Scenario.Define<ScenarioContext>()
.WithEndpoint<TransactionScopeEndpoint>(b => b.CustomConfig(c =>
{
c.UseTransport(fakeTransport);
}))
.Done(c => c.EndpointsStarted)
.Run();

Assert.True(fakeTransport.DtcIsAvailable, fakeTransport.DtcCheckException?.Message);
}

public class TransactionScopeEndpoint : EndpointConfigurationBuilder
{
public TransactionScopeEndpoint()
{
EndpointSetup<DefaultServer>();
}
}
}
}
55 changes: 17 additions & 38 deletions src/NServiceBus.AcceptanceTests/Tx/FakePromotableResourceManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,24 @@

public class FakePromotableResourceManager : IPromotableSinglePhaseNotification, IEnlistmentNotification
{
public void Prepare(PreparingEnlistment preparingEnlistment)
{
preparingEnlistment.Prepared();
}

public void Commit(Enlistment enlistment)
{
enlistment.Done();
}

public void Rollback(Enlistment enlistment)
{
enlistment.Done();
}

public void InDoubt(Enlistment enlistment)
{
enlistment.Done();
}

public void Initialize()
{
}

public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment)
{
singlePhaseEnlistment.Committed();
}

public void Rollback(SinglePhaseEnlistment singlePhaseEnlistment)
{
singlePhaseEnlistment.Done();
}

public byte[] Promote()
{
return TransactionInterop.GetTransmitterPropagationToken(new CommittableTransaction());
}
public void Prepare(PreparingEnlistment preparingEnlistment) => preparingEnlistment.Prepared();

public void Commit(Enlistment enlistment) => enlistment.Done();

public void Rollback(Enlistment enlistment) => enlistment.Done();

public void InDoubt(Enlistment enlistment) => enlistment.Done();

public void Initialize() { }

public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment) => singlePhaseEnlistment.Committed();

public void Rollback(SinglePhaseEnlistment singlePhaseEnlistment) => singlePhaseEnlistment.Done();

public byte[] Promote() => TransactionInterop.GetTransmitterPropagationToken(new CommittableTransaction());

public static Guid ResourceManagerId = Guid.Parse("6f057e24-a0d8-4c95-b091-b8dc9a916fa4");

public static void ForceDtc() => Transaction.Current.EnlistDurable(ResourceManagerId, new FakePromotableResourceManager(), EnlistmentOptions.None);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace NServiceBus.AcceptanceTests.Tx
{
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using System.Transactions;
using AcceptanceTesting;
Expand All @@ -27,11 +26,10 @@ public async Task Should_enlist_the_receive_in_the_dtc_tx()
[Test]
public void Basic_assumptions_promotable_should_fail_if_durable_already_exists()
{

if (OperatingSystem.IsWindows())
{
// This test only work on Windows
TransactionManager.ImplicitDistributedTransactions = true;

using (var tx = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
Transaction.Current.EnlistDurable(FakePromotableResourceManager.ResourceManagerId, new FakePromotableResourceManager(), EnlistmentOptions.None);
Expand Down
6 changes: 6 additions & 0 deletions src/NServiceBus.Core/Transports/TransportSeam.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Microsoft.Extensions.DependencyInjection;
using Settings;
using Transport;
Expand Down Expand Up @@ -33,6 +34,11 @@ public void Configure(ReceiveSettings[] receivers)

public async Task<TransportInfrastructure> CreateTransportInfrastructure(CancellationToken cancellationToken = default)
{
if (OperatingSystem.IsWindows() && TransportDefinition.TransportTransactionMode == TransportTransactionMode.TransactionScope)
{
TransactionManager.ImplicitDistributedTransactions = true;
}

TransportInfrastructure = await TransportDefinition.Initialize(hostSettings, receivers, QueueBindings.SendingAddresses.ToArray(), cancellationToken)
.ConfigureAwait(false);

Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.TransportTests/NServiceBusTransportTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Logging;
using NUnit.Framework;
using Routing;
Expand Down Expand Up @@ -106,6 +107,12 @@ protected async Task StartPump(OnMessage onMessage, OnError onError, TransportTr
var transport = configurer.CreateTransportDefinition();

IgnoreUnsupportedTransactionModes(transport, transactionMode);

if (OperatingSystem.IsWindows() && transactionMode == TransportTransactionMode.TransactionScope)
{
TransactionManager.ImplicitDistributedTransactions = true;
}

transport.TransportTransactionMode = transactionMode;

transportInfrastructure = await configurer.Configure(transport, hostSettings, new QueueAddress(InputQueueName), ErrorQueueName, cancellationToken);
Expand Down

0 comments on commit ad78c37

Please sign in to comment.