Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/7.0] Distributed transaction fixes #76425

Merged
merged 2 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ internal sealed class DtcProxyShimFactory
private readonly ConcurrentQueue<ITransactionTransmitter> _cachedTransmitters = new();
private readonly ConcurrentQueue<ITransactionReceiver> _cachedReceivers = new();

private static readonly int s_maxCachedInterfaces = Environment.ProcessorCount * 2;

private readonly EventWaitHandle _eventHandle;

private ITransactionDispenser _transactionDispenser = null!; // Late-initialized in ConnectToProxy
Expand Down Expand Up @@ -350,7 +352,13 @@ internal ITransactionTransmitter GetCachedTransmitter(ITransaction transaction)
}

internal void ReturnCachedTransmitter(ITransactionTransmitter transmitter)
=> _cachedTransmitters.Enqueue(transmitter);
{
if (_cachedTransmitters.Count < s_maxCachedInterfaces)
{
transmitter.Reset();
_cachedTransmitters.Enqueue(transmitter);
}
}

internal ITransactionReceiver GetCachedReceiver()
{
Expand All @@ -366,5 +374,11 @@ internal ITransactionReceiver GetCachedReceiver()
}

internal void ReturnCachedReceiver(ITransactionReceiver receiver)
=> _cachedReceivers.Enqueue(receiver);
{
if (_cachedReceivers.Count < s_maxCachedInterfaces)
{
receiver.Reset();
_cachedReceivers.Enqueue(receiver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1867,7 +1867,6 @@ Guid promoterType
return false;
}


internal override void CompleteBlockingClone(InternalTransaction tx)
{
// First try to complete one of the internal blocking clones
Expand Down Expand Up @@ -1900,17 +1899,23 @@ internal override void CompleteBlockingClone(InternalTransaction tx)
Monitor.Exit(tx);
try
{
dtx.Complete();
try
{
dtx.Complete();
}
finally
{
dtx.Dispose();
}
}
finally
{
dtx.Dispose();
Monitor.Enter(tx);
}
}
}
}


internal override void CompleteAbortingClone(InternalTransaction tx)
{
// If we have a phase1Volatile.VolatileDemux, we have a phase1 volatile enlistment
Expand All @@ -1937,11 +1942,18 @@ internal override void CompleteAbortingClone(InternalTransaction tx)
Monitor.Exit(tx);
try
{
dtx.Complete();
try
{
dtx.Complete();
}
finally
{
dtx.Dispose();
}
}
finally
{
dtx.Dispose();
Monitor.Enter(tx);
}
}
}
Expand Down
85 changes: 58 additions & 27 deletions src/libraries/System.Transactions.Local/tests/OleTxTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ public void Volatile_and_durable_enlistments(int volatileCount)

[ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))]
public void Promotion()
=> PromotionCore();

// #76010
[ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))]
public void Promotion_twice()
{
PromotionCore();
PromotionCore();
}

private void PromotionCore()
{
Test(() =>
{
Expand Down Expand Up @@ -203,28 +214,30 @@ public void Promotion()
static void Remote1(string propagationTokenFilePath)
=> Test(() =>
{
using var tx = new CommittableTransaction();

var outcomeEvent = new AutoResetEvent(false);
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);

// We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting.
byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx);
File.WriteAllBytes(propagationTokenFilePath, propagationToken);
using (var tx = new CommittableTransaction())
{
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);

// Signal to the main process that the propagation token is ready to be read
using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1");
waitHandle1.Set();
// We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting.
byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx);
File.WriteAllBytes(propagationTokenFilePath, propagationToken);

// The main process will now import our transaction via the propagation token, and propagate it to a 2nd process.
// In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit.
// When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this
// contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this.
using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3");
Assert.True(waitHandle3.WaitOne(Timeout));
// Signal to the main process that the propagation token is ready to be read
using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1");
waitHandle1.Set();

tx.Commit();
// The main process will now import our transaction via the propagation token, and propagate it to a 2nd process.
// In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit.
// When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this
// contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this.
using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3");
Assert.True(waitHandle3.WaitOne(Timeout));

tx.Commit();
}

// Wait for the commit to occur on our enlistment, then exit successfully.
Assert.True(outcomeEvent.WaitOne(Timeout));
Expand All @@ -234,18 +247,20 @@ static void Remote1(string propagationTokenFilePath)
static void Remote2(string exportCookieFilePath)
=> Test(() =>
{
var outcomeEvent = new AutoResetEvent(false);

// Load the export cookie and enlist durably
byte[] exportCookie = File.ReadAllBytes(exportCookieFilePath);
using var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie);

// Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token.
var outcomeEvent = new AutoResetEvent(false);
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);
using (var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie))
{
// Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token.
var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);

// Signal to the main process that we're enlisted and ready to commit
using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2");
waitHandle.Set();
// Signal to the main process that we're enlisted and ready to commit
using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2");
waitHandle.Set();
}

// Wait for the main process to commit the transaction
Assert.True(outcomeEvent.WaitOne(Timeout));
Expand Down Expand Up @@ -414,6 +429,22 @@ public void TransmitterPropagationToken()
Assert.Equal(tx.TransactionInformation.DistributedIdentifier, tx2.TransactionInformation.DistributedIdentifier);
});

// #76010
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))]
public void TransactionScope_with_DependentTransaction()
=> Test(() =>
{
using var committableTransaction = new CommittableTransaction();
var propagationToken = TransactionInterop.GetTransmitterPropagationToken(committableTransaction);

var dependentTransaction = TransactionInterop.GetTransactionFromTransmitterPropagationToken(propagationToken);

using (var scope = new TransactionScope(dependentTransaction))
{
scope.Complete();
}
});

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))]
public void GetExportCookie()
=> Test(() =>
Expand Down Expand Up @@ -472,7 +503,7 @@ private static void Test(Action action)
// In CI, we sometimes get XACT_E_TMNOTAVAILABLE; when it happens, it's typically on the very first
// attempt to connect to MSDTC (flaky/slow on-demand startup of MSDTC), though not only.
// This catches that error and retries.
int nRetries = 5;
int nRetries = 60;

while (true)
{
Expand Down