From a64318b0e3533108173367396bd721ea57171044 Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Wed, 28 Sep 2022 13:54:34 +0200 Subject: [PATCH 1/2] Distributed transaction fixes * Retake lock when using a dependent transaction from a TransactionScope (#76010). * Reset TransactionTransmitter and Receiver before reusing them (#76010). * Increase MSDTC startup timeout from 2.5 to 30 seconds (#75822) Fixes #76010 Fixes #75822 --- .../DtcProxyShim/DtcProxyShimFactory.cs | 18 +++- .../System/Transactions/TransactionState.cs | 24 ++++-- .../tests/OleTxTests.cs | 85 +++++++++++++------ 3 files changed, 92 insertions(+), 35 deletions(-) diff --git a/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs b/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs index 0a65a8a72a18b..4a2e0f780879a 100644 --- a/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs +++ b/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs @@ -30,6 +30,8 @@ internal sealed class DtcProxyShimFactory private readonly ConcurrentQueue _cachedTransmitters = new(); private readonly ConcurrentQueue _cachedReceivers = new(); + private static readonly int _maxCachedInterfaces = Environment.ProcessorCount * 2; + private readonly EventWaitHandle _eventHandle; private ITransactionDispenser _transactionDispenser = null!; // Late-initialized in ConnectToProxy @@ -350,7 +352,13 @@ internal ITransactionTransmitter GetCachedTransmitter(ITransaction transaction) } internal void ReturnCachedTransmitter(ITransactionTransmitter transmitter) - => _cachedTransmitters.Enqueue(transmitter); + { + if (_cachedTransmitters.Count < _maxCachedInterfaces) + { + transmitter.Reset(); + _cachedTransmitters.Enqueue(transmitter); + } + } internal ITransactionReceiver GetCachedReceiver() { @@ -366,5 +374,11 @@ internal ITransactionReceiver GetCachedReceiver() } internal void ReturnCachedReceiver(ITransactionReceiver receiver) - => _cachedReceivers.Enqueue(receiver); + { + if (_cachedReceivers.Count < _maxCachedInterfaces) + { + receiver.Reset(); + _cachedReceivers.Enqueue(receiver); + } + } } diff --git a/src/libraries/System.Transactions.Local/src/System/Transactions/TransactionState.cs b/src/libraries/System.Transactions.Local/src/System/Transactions/TransactionState.cs index 331e679db04ea..1e0cdd68ff163 100644 --- a/src/libraries/System.Transactions.Local/src/System/Transactions/TransactionState.cs +++ b/src/libraries/System.Transactions.Local/src/System/Transactions/TransactionState.cs @@ -1867,7 +1867,6 @@ Guid promoterType return false; } - internal override void CompleteBlockingClone(InternalTransaction tx) { // First try to complete one of the internal blocking clones @@ -1900,17 +1899,23 @@ internal override void CompleteBlockingClone(InternalTransaction tx) Monitor.Exit(tx); try { - dtx.Complete(); + try + { + dtx.Complete(); + } + finally + { + dtx.Dispose(); + } } finally { - dtx.Dispose(); + Monitor.Enter(tx); } } } } - internal override void CompleteAbortingClone(InternalTransaction tx) { // If we have a phase1Volatile.VolatileDemux, we have a phase1 volatile enlistment @@ -1937,11 +1942,18 @@ internal override void CompleteAbortingClone(InternalTransaction tx) Monitor.Exit(tx); try { - dtx.Complete(); + try + { + dtx.Complete(); + } + finally + { + dtx.Dispose(); + } } finally { - dtx.Dispose(); + Monitor.Enter(tx); } } } diff --git a/src/libraries/System.Transactions.Local/tests/OleTxTests.cs b/src/libraries/System.Transactions.Local/tests/OleTxTests.cs index 71893a29f98bd..739ca93afeb2d 100644 --- a/src/libraries/System.Transactions.Local/tests/OleTxTests.cs +++ b/src/libraries/System.Transactions.Local/tests/OleTxTests.cs @@ -109,6 +109,17 @@ public void Volatile_and_durable_enlistments(int volatileCount) [ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))] public void Promotion() + => PromotionCore(); + + // #76010 + [ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))] + public void Promotion_twice() + { + PromotionCore(); + PromotionCore(); + } + + private void PromotionCore() { Test(() => { @@ -203,28 +214,30 @@ public void Promotion() static void Remote1(string propagationTokenFilePath) => Test(() => { - using var tx = new CommittableTransaction(); - var outcomeEvent = new AutoResetEvent(false); - var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent); - tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None); - // We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting. - byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx); - File.WriteAllBytes(propagationTokenFilePath, propagationToken); + using (var tx = new CommittableTransaction()) + { + var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent); + tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None); - // Signal to the main process that the propagation token is ready to be read - using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1"); - waitHandle1.Set(); + // We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting. + byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx); + File.WriteAllBytes(propagationTokenFilePath, propagationToken); - // The main process will now import our transaction via the propagation token, and propagate it to a 2nd process. - // In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit. - // When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this - // contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this. - using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3"); - Assert.True(waitHandle3.WaitOne(Timeout)); + // Signal to the main process that the propagation token is ready to be read + using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1"); + waitHandle1.Set(); - tx.Commit(); + // The main process will now import our transaction via the propagation token, and propagate it to a 2nd process. + // In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit. + // When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this + // contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this. + using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3"); + Assert.True(waitHandle3.WaitOne(Timeout)); + + tx.Commit(); + } // Wait for the commit to occur on our enlistment, then exit successfully. Assert.True(outcomeEvent.WaitOne(Timeout)); @@ -234,18 +247,20 @@ static void Remote1(string propagationTokenFilePath) static void Remote2(string exportCookieFilePath) => Test(() => { + var outcomeEvent = new AutoResetEvent(false); + // Load the export cookie and enlist durably byte[] exportCookie = File.ReadAllBytes(exportCookieFilePath); - using var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie); - - // Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token. - var outcomeEvent = new AutoResetEvent(false); - var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent); - tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None); + using (var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie)) + { + // Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token. + var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent); + tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None); - // Signal to the main process that we're enlisted and ready to commit - using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2"); - waitHandle.Set(); + // Signal to the main process that we're enlisted and ready to commit + using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2"); + waitHandle.Set(); + } // Wait for the main process to commit the transaction Assert.True(outcomeEvent.WaitOne(Timeout)); @@ -414,6 +429,22 @@ public void TransmitterPropagationToken() Assert.Equal(tx.TransactionInformation.DistributedIdentifier, tx2.TransactionInformation.DistributedIdentifier); }); + // #76010 + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))] + public void TransactionScope_with_DependentTransaction() + => Test(() => + { + using var committableTransaction = new CommittableTransaction(); + var propagationToken = TransactionInterop.GetTransmitterPropagationToken(committableTransaction); + + var dependentTransaction = TransactionInterop.GetTransactionFromTransmitterPropagationToken(propagationToken); + + using (var scope = new TransactionScope(dependentTransaction)) + { + scope.Complete(); + } + }); + [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))] public void GetExportCookie() => Test(() => @@ -472,7 +503,7 @@ private static void Test(Action action) // In CI, we sometimes get XACT_E_TMNOTAVAILABLE; when it happens, it's typically on the very first // attempt to connect to MSDTC (flaky/slow on-demand startup of MSDTC), though not only. // This catches that error and retries. - int nRetries = 5; + int nRetries = 60; while (true) { From 5495945826b8dc3e52ce434ed3784ce947398c0c Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Thu, 29 Sep 2022 19:19:44 +0200 Subject: [PATCH 2/2] Fix member naming --- .../System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs b/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs index 4a2e0f780879a..35454d05adae4 100644 --- a/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs +++ b/src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs @@ -30,7 +30,7 @@ internal sealed class DtcProxyShimFactory private readonly ConcurrentQueue _cachedTransmitters = new(); private readonly ConcurrentQueue _cachedReceivers = new(); - private static readonly int _maxCachedInterfaces = Environment.ProcessorCount * 2; + private static readonly int s_maxCachedInterfaces = Environment.ProcessorCount * 2; private readonly EventWaitHandle _eventHandle; @@ -353,7 +353,7 @@ internal ITransactionTransmitter GetCachedTransmitter(ITransaction transaction) internal void ReturnCachedTransmitter(ITransactionTransmitter transmitter) { - if (_cachedTransmitters.Count < _maxCachedInterfaces) + if (_cachedTransmitters.Count < s_maxCachedInterfaces) { transmitter.Reset(); _cachedTransmitters.Enqueue(transmitter); @@ -375,7 +375,7 @@ internal ITransactionReceiver GetCachedReceiver() internal void ReturnCachedReceiver(ITransactionReceiver receiver) { - if (_cachedReceivers.Count < _maxCachedInterfaces) + if (_cachedReceivers.Count < s_maxCachedInterfaces) { receiver.Reset(); _cachedReceivers.Enqueue(receiver);