Distributed transaction fixes (#76310)
authorShay Rojansky <roji@roji.org>
Fri, 30 Sep 2022 07:15:06 +0000 (09:15 +0200)
committerGitHub <noreply@github.com>
Fri, 30 Sep 2022 07:15:06 +0000 (09:15 +0200)
* Retake lock when using a dependent transaction from a
  TransactionScope (#76010).
* Reset TransactionTransmitter and Receiver before reusing them
  (#76010).
* Increase MSDTC startup timeout from 2.5 to 30 seconds (#75822)

Fixes #76010
Fixes #75822

src/libraries/System.Transactions.Local/src/System/Transactions/DtcProxyShim/DtcProxyShimFactory.cs
src/libraries/System.Transactions.Local/src/System/Transactions/TransactionState.cs
src/libraries/System.Transactions.Local/tests/OleTxTests.cs

index 0a65a8a..35454d0 100644 (file)
@@ -30,6 +30,8 @@ internal sealed class DtcProxyShimFactory
     private readonly ConcurrentQueue<ITransactionTransmitter> _cachedTransmitters = new();
     private readonly ConcurrentQueue<ITransactionReceiver> _cachedReceivers = new();
 
+    private static readonly int s_maxCachedInterfaces = Environment.ProcessorCount * 2;
+
     private readonly EventWaitHandle _eventHandle;
 
     private ITransactionDispenser _transactionDispenser = null!; // Late-initialized in ConnectToProxy
@@ -350,7 +352,13 @@ internal sealed class DtcProxyShimFactory
     }
 
     internal void ReturnCachedTransmitter(ITransactionTransmitter transmitter)
-        => _cachedTransmitters.Enqueue(transmitter);
+    {
+        if (_cachedTransmitters.Count < s_maxCachedInterfaces)
+        {
+            transmitter.Reset();
+            _cachedTransmitters.Enqueue(transmitter);
+        }
+    }
 
     internal ITransactionReceiver GetCachedReceiver()
     {
@@ -366,5 +374,11 @@ internal sealed class DtcProxyShimFactory
     }
 
     internal void ReturnCachedReceiver(ITransactionReceiver receiver)
-        => _cachedReceivers.Enqueue(receiver);
+    {
+        if (_cachedReceivers.Count < s_maxCachedInterfaces)
+        {
+            receiver.Reset();
+            _cachedReceivers.Enqueue(receiver);
+        }
+    }
 }
index 331e679..1e0cdd6 100644 (file)
@@ -1867,7 +1867,6 @@ namespace System.Transactions
             return false;
         }
 
-
         internal override void CompleteBlockingClone(InternalTransaction tx)
         {
             // First try to complete one of the internal blocking clones
@@ -1900,17 +1899,23 @@ namespace System.Transactions
                     Monitor.Exit(tx);
                     try
                     {
-                        dtx.Complete();
+                        try
+                        {
+                            dtx.Complete();
+                        }
+                        finally
+                        {
+                            dtx.Dispose();
+                        }
                     }
                     finally
                     {
-                        dtx.Dispose();
+                        Monitor.Enter(tx);
                     }
                 }
             }
         }
 
-
         internal override void CompleteAbortingClone(InternalTransaction tx)
         {
             // If we have a phase1Volatile.VolatileDemux, we have a phase1 volatile enlistment
@@ -1937,11 +1942,18 @@ namespace System.Transactions
                     Monitor.Exit(tx);
                     try
                     {
-                        dtx.Complete();
+                        try
+                        {
+                            dtx.Complete();
+                        }
+                        finally
+                        {
+                            dtx.Dispose();
+                        }
                     }
                     finally
                     {
-                        dtx.Dispose();
+                        Monitor.Enter(tx);
                     }
                 }
             }
index 71893a2..739ca93 100644 (file)
@@ -109,6 +109,17 @@ public class OleTxTests : IClassFixture<OleTxTests.OleTxFixture>
 
     [ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))]
     public void Promotion()
+        => PromotionCore();
+
+    // #76010
+    [ConditionalFact(nameof(IsRemoteExecutorSupportedAndNotNano))]
+    public void Promotion_twice()
+    {
+        PromotionCore();
+        PromotionCore();
+    }
+
+    private void PromotionCore()
     {
         Test(() =>
         {
@@ -203,28 +214,30 @@ public class OleTxTests : IClassFixture<OleTxTests.OleTxFixture>
         static void Remote1(string propagationTokenFilePath)
             => Test(() =>
             {
-                using var tx = new CommittableTransaction();
-
                 var outcomeEvent = new AutoResetEvent(false);
-                var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
-                tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);
 
-                // We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting.
-                byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx);
-                File.WriteAllBytes(propagationTokenFilePath, propagationToken);
+                using (var tx = new CommittableTransaction())
+                {
+                    var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
+                    tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);
 
-                // Signal to the main process that the propagation token is ready to be read
-                using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1");
-                waitHandle1.Set();
+                    // We now have an OleTx transaction. Save its propagation token to disk so that the main process can read it when promoting.
+                    byte[] propagationToken = TransactionInterop.GetTransmitterPropagationToken(tx);
+                    File.WriteAllBytes(propagationTokenFilePath, propagationToken);
 
-                // The main process will now import our transaction via the propagation token, and propagate it to a 2nd process.
-                // In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit.
-                // When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this
-                // contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this.
-                using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3");
-                Assert.True(waitHandle3.WaitOne(Timeout));
+                    // Signal to the main process that the propagation token is ready to be read
+                    using var waitHandle1 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion1");
+                    waitHandle1.Set();
 
-                tx.Commit();
+                    // The main process will now import our transaction via the propagation token, and propagate it to a 2nd process.
+                    // In the main process the transaction is delegated; we're the one who started it, and so we're the one who need to Commit.
+                    // When Commit() is called in the main process, that will trigger a SinglePhaseCommit on the PSPE which represents us. In SQL Server this
+                    // contacts the DB to actually commit the transaction with MSDTC. In this simulation we'll just use the wait handle again to trigger this.
+                    using var waitHandle3 = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion3");
+                    Assert.True(waitHandle3.WaitOne(Timeout));
+
+                    tx.Commit();
+                }
 
                 // Wait for the commit to occur on our enlistment, then exit successfully.
                 Assert.True(outcomeEvent.WaitOne(Timeout));
@@ -234,18 +247,20 @@ public class OleTxTests : IClassFixture<OleTxTests.OleTxFixture>
         static void Remote2(string exportCookieFilePath)
             => Test(() =>
             {
+                var outcomeEvent = new AutoResetEvent(false);
+
                 // Load the export cookie and enlist durably
                 byte[] exportCookie = File.ReadAllBytes(exportCookieFilePath);
-                using var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie);
-
-                // Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token.
-                var outcomeEvent = new AutoResetEvent(false);
-                var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
-                tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);
+                using (var tx = TransactionInterop.GetTransactionFromExportCookie(exportCookie))
+                {
+                    // Now enlist durably. This triggers promotion of the first PSPE, reading the propagation token.
+                    var enlistment = new TestEnlistment(Phase1Vote.Prepared, EnlistmentOutcome.Committed, outcomeReceived: outcomeEvent);
+                    tx.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None);
 
-                // Signal to the main process that we're enlisted and ready to commit
-                using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2");
-                waitHandle.Set();
+                    // Signal to the main process that we're enlisted and ready to commit
+                    using var waitHandle = new EventWaitHandle(initialState: false, EventResetMode.ManualReset, "System.Transactions.Tests.OleTxTests.Promotion2");
+                    waitHandle.Set();
+                }
 
                 // Wait for the main process to commit the transaction
                 Assert.True(outcomeEvent.WaitOne(Timeout));
@@ -414,6 +429,22 @@ public class OleTxTests : IClassFixture<OleTxTests.OleTxFixture>
             Assert.Equal(tx.TransactionInformation.DistributedIdentifier, tx2.TransactionInformation.DistributedIdentifier);
         });
 
+    // #76010
+    [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))]
+    public void TransactionScope_with_DependentTransaction()
+    => Test(() =>
+    {
+        using var committableTransaction = new CommittableTransaction();
+        var propagationToken = TransactionInterop.GetTransmitterPropagationToken(committableTransaction);
+
+        var dependentTransaction = TransactionInterop.GetTransactionFromTransmitterPropagationToken(propagationToken);
+
+        using (var scope = new TransactionScope(dependentTransaction))
+        {
+            scope.Complete();
+        }
+    });
+
     [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsNotWindowsNanoServer))]
     public void GetExportCookie()
         => Test(() =>
@@ -472,7 +503,7 @@ public class OleTxTests : IClassFixture<OleTxTests.OleTxFixture>
         // In CI, we sometimes get XACT_E_TMNOTAVAILABLE; when it happens, it's typically on the very first
         // attempt to connect to MSDTC (flaky/slow on-demand startup of MSDTC), though not only.
         // This catches that error and retries.
-        int nRetries = 5;
+        int nRetries = 60;
 
         while (true)
         {