Skip to content

Commit

Permalink
Distributed transaction fixes (#76310)
Browse files Browse the repository at this point in the history
* Retake lock when using a dependent transaction from a
  TransactionScope (#76010).
* Reset TransactionTransmitter and Receiver before reusing them
  (#76010).
* Increase MSDTC startup timeout from 2.5 to 30 seconds (#75822)

Fixes #76010
Fixes #75822
  • Loading branch information
roji authored Sep 30, 2022
1 parent 9bc025d commit ff2acd0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ internal sealed class DtcProxyShimFactory
private readonly ConcurrentQueue<ITransactionTransmitter> _cachedTransmitters = new();
private readonly ConcurrentQueue<ITransactionReceiver> _cachedReceivers = new();

private static readonly int s_maxCachedInterfaces = Environment.ProcessorCount * 2;

private readonly EventWaitHandle _eventHandle;

private ITransactionDispenser _transactionDispenser = null!; // Late-initialized in ConnectToProxy
Expand Down Expand Up @@ -350,7 +352,13 @@ internal ITransactionTransmitter GetCachedTransmitter(ITransaction transaction)
}

internal void ReturnCachedTransmitter(ITransactionTransmitter transmitter)
=> _cachedTransmitters.Enqueue(transmitter);
{
if (_cachedTransmitters.Count < s_maxCachedInterfaces)
{
transmitter.Reset();
_cachedTransmitters.Enqueue(transmitter);
}
}

internal ITransactionReceiver GetCachedReceiver()
{
Expand All @@ -366,5 +374,11 @@ internal ITransactionReceiver GetCachedReceiver()
}

internal void ReturnCachedReceiver(ITransactionReceiver receiver)
=> _cachedReceivers.Enqueue(receiver);
{
if (_cachedReceivers.Count < s_maxCachedInterfaces)
{
receiver.Reset();
_cachedReceivers.Enqueue(receiver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1867,7 +1867,6 @@ Guid promoterType
return false;
}


internal override void CompleteBlockingClone(InternalTransaction tx)
{
// First try to complete one of the internal blocking clones
Expand Down Expand Up @@ -1900,17 +1899,23 @@ internal override void CompleteBlockingClone(InternalTransaction tx)
Monitor.Exit(tx);
try
{
dtx.Complete();
try
{
dtx.Complete();
}
finally
{
dtx.Dispose();
}
}
finally
{
dtx.Dispose();
Monitor.Enter(tx);
}
}
}
}


internal override void CompleteAbortingClone(InternalTransaction tx)
{
// If we have a phase1Volatile.VolatileDemux, we have a phase1 volatile enlistment
Expand All @@ -1937,11 +1942,18 @@ internal override void CompleteAbortingClone(InternalTransaction tx)
Monitor.Exit(tx);
try
{
dtx.Complete();
try
{
dtx.Complete();
}
finally
{
dtx.Dispose();
}
}
finally
{
dtx.Dispose();
Monitor.Enter(tx);
}
}
}
Expand Down
85 changes: 58 additions & 27 deletions src/libraries/System.Transactions.Local/tests/OleTxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ public void Volatile_and_durable_enlistments(int volatileCount)

[ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))]
public void Promotion()
=> PromotionCore();

// #76010
[ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))]
public void Promotion_twice()
{
PromotionCore();
PromotionCore();
}

private void PromotionCore()
{
Test(() =>
{
Expand Down Expand Up @@ -203,28 +214,30 @@ public void Promotion()
static void Remote1(string propagationTokenFilePath)
=> Test(() =>
{
using var tx = new CommittableTransaction();

var outcomeEvent = new AutoResetEvent(false);
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);

// We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting.
byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx);
File.WriteAllBytes(propagationTokenFilePath, propagationToken);
using (var tx = new CommittableTransaction())
{
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);

// Signal to the main process that the propagation token is ready to be read
using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1");
waitHandle1.Set();
// We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting.
byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx);
File.WriteAllBytes(propagationTokenFilePath, propagationToken);

// The main process will now import our transaction via the propagation token, and propagate it to a 2nd process.
// In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit.
// When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this
// contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this.
using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3");
Assert.True(waitHandle3.WaitOne(Timeout));
// Signal to the main process that the propagation token is ready to be read
using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1");
waitHandle1.Set();

tx.Commit();
// The main process will now import our transaction via the propagation token, and propagate it to a 2nd process.
// In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit.
// When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this
// contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this.
using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3");
Assert.True(waitHandle3.WaitOne(Timeout));

tx.Commit();
}

// Wait for the commit to occur on our enlistment, then exit successfully.
Assert.True(outcomeEvent.WaitOne(Timeout));
Expand All @@ -234,18 +247,20 @@ static void Remote1(string propagationTokenFilePath)
static void Remote2(string exportCookieFilePath)
=> Test(() =>
{
var outcomeEvent = new AutoResetEvent(false);

// Load the export cookie and enlist durably
byte[] exportCookie = File.ReadAllBytes(exportCookieFilePath);
using var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie);

// Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token.
var outcomeEvent = new AutoResetEvent(false);
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);
using (var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie))
{
// Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token.
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);

// Signal to the main process that we're enlisted and ready to commit
using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2");
waitHandle.Set();
// Signal to the main process that we're enlisted and ready to commit
using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2");
waitHandle.Set();
}

// Wait for the main process to commit the transaction
Assert.True(outcomeEvent.WaitOne(Timeout));
Expand Down Expand Up @@ -414,6 +429,22 @@ public void TransmitterPropagationToken()
Assert.Equal(tx.TransactionInformation.DistributedIdentifier, tx2.TransactionInformation.DistributedIdentifier);
});

// #76010
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))]
public void TransactionScope_with_DependentTransaction()
=> Test(() =>
{
using var committableTransaction = new CommittableTransaction();
var propagationToken = TransactionInterop.GetTransmitterPropagationToken(committableTransaction);

var dependentTransaction = TransactionInterop.GetTransactionFromTransmitterPropagationToken(propagationToken);

using (var scope = new TransactionScope(dependentTransaction))
{
scope.Complete();
}
});

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))]
public void GetExportCookie()
=> Test(() =>
Expand Down Expand Up @@ -472,7 +503,7 @@ private static void Test(Action action)
// In CI, we sometimes get XACT_E_TMNOTAVAILABLE; when it happens, it's typically on the very first
// attempt to connect to MSDTC (flaky/slow on-demand startup of MSDTC), though not only.
// This catches that error and retries.
int nRetries = 5;
int nRetries = 60;

while (true)
{
Expand Down

0 comments on commit ff2acd0

Please sign in to comment.