Fix thread pool hang (#56346)
authorKoundinya Veluri <kouvel@users.noreply.github.com>
Tue, 27 Jul 2021 16:59:14 +0000 (09:59 -0700)
committerGitHub <noreply@github.com>
Tue, 27 Jul 2021 16:59:14 +0000 (09:59 -0700)
* Fix thread pool hang

- In https://github.com/dotnet/runtime/pull/53471 the thread count goal was moved out of `ThreadCounts`, it turns out that are a few subtle races that it was avoiding. There are other ways to fix it, but I've added the goal back into `ThreadCounts` for now.
- Reverted PR https://github.com/dotnet/runtime/pull/55985, which worked around the issue in the CI

Fixes https://github.com/dotnet/runtime/issues/55642

* Revert "mitigation for quic tests hangs (#55985)"

This reverts commit 0a5e93b09fe92cf866456552eef78a95bf6fdf27.

src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs
src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs
src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs
src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs
src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs
src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs
src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs

index 616d837..a6c554b 100644 (file)
@@ -595,9 +595,7 @@ namespace System.Net.Quic.Implementations.MsQuic
             byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
             try
             {
-                Task<int> t = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask();
-                ((IAsyncResult)t).AsyncWaitHandle.WaitOne();
-                int readLength = t.GetAwaiter().GetResult();
+                int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult();
                 rentedBuffer.AsSpan(0, readLength).CopyTo(buffer);
                 return readLength;
             }
@@ -612,9 +610,7 @@ namespace System.Net.Quic.Implementations.MsQuic
             ThrowIfDisposed();
 
             // TODO: optimize this.
-            Task t = WriteAsync(buffer.ToArray()).AsTask();
-            ((IAsyncResult)t).AsyncWaitHandle.WaitOne();
-            t.GetAwaiter().GetResult();
+            WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
         }
 
         // MsQuic doesn't support explicit flushing
index 1ac8ecb..bf2b0d1 100644 (file)
@@ -12,7 +12,7 @@ namespace System.Threading
             get
             {
                 _threadAdjustmentLock.VerifyIsLocked();
-                return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
+                return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
             }
         }
 
@@ -44,7 +44,7 @@ namespace System.Threading
                 Debug.Assert(_numBlockedThreads > 0);
 
                 if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
-                    _separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
+                    _separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
                 {
                     if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
                     {
@@ -79,7 +79,7 @@ namespace System.Threading
 
                 if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
                     _numThreadsAddedDueToBlocking > 0 &&
-                    _separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
+                    _separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
                 {
                     wakeGateThread = true;
                     _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
@@ -126,7 +126,8 @@ namespace System.Threading
             addWorker = false;
 
             short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
-            short numThreadsGoal = _separated.numThreadsGoal;
+            ThreadCounts counts = _separated.counts;
+            short numThreadsGoal = counts.NumThreadsGoal;
             if (numThreadsGoal == targetThreadsGoal)
             {
                 return 0;
@@ -144,7 +145,8 @@ namespace System.Threading
 
                 short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
                 _numThreadsAddedDueToBlocking -= toSubtract;
-                _separated.numThreadsGoal = numThreadsGoal -= toSubtract;
+                numThreadsGoal -= toSubtract;
+                _separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal);
                 HillClimbing.ThreadPoolHillClimber.ForceChange(
                     numThreadsGoal,
                     HillClimbing.StateOrTransition.CooperativeBlocking);
@@ -158,7 +160,6 @@ namespace System.Threading
             {
                 // Calculate how many threads can be added without a delay. Threads that were already created but may be just
                 // waiting for work can be released for work without a delay, but creating a new thread may need a delay.
-                ThreadCounts counts = _separated.counts;
                 short maxThreadsGoalWithoutDelay =
                     Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
                 short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
@@ -225,7 +226,7 @@ namespace System.Threading
                 } while (false);
 
                 _numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
-                _separated.numThreadsGoal = newNumThreadsGoal;
+                counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal);
                 HillClimbing.ThreadPoolHillClimber.ForceChange(
                     newNumThreadsGoal,
                     HillClimbing.StateOrTransition.CooperativeBlocking);
index fefe16d..b28286d 100644 (file)
@@ -126,20 +126,31 @@ namespace System.Threading
                                 // of the number of existing threads, is compared with the goal. There may be alternative
                                 // solutions, for now this is only to maintain consistency in behavior.
                                 ThreadCounts counts = threadPoolInstance._separated.counts;
-                                if (counts.NumProcessingWork < threadPoolInstance._maxThreads &&
-                                    counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
+                                while (
+                                    counts.NumProcessingWork < threadPoolInstance._maxThreads &&
+                                    counts.NumProcessingWork >= counts.NumThreadsGoal)
                                 {
                                     if (debuggerBreakOnWorkStarvation)
                                     {
                                         Debugger.Break();
                                     }
 
+                                    ThreadCounts newCounts = counts;
                                     short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1);
-                                    threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
-                                    HillClimbing.ThreadPoolHillClimber.ForceChange(
-                                        newNumThreadsGoal,
-                                        HillClimbing.StateOrTransition.Starvation);
-                                    addWorker = true;
+                                    newCounts.NumThreadsGoal = newNumThreadsGoal;
+
+                                    ThreadCounts countsBeforeUpdate =
+                                        threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
+                                    if (countsBeforeUpdate == counts)
+                                    {
+                                        HillClimbing.ThreadPoolHillClimber.ForceChange(
+                                            newNumThreadsGoal,
+                                            HillClimbing.StateOrTransition.Starvation);
+                                        addWorker = true;
+                                        break;
+                                    }
+
+                                    counts = countsBeforeUpdate;
                                 }
                             }
                             finally
@@ -183,7 +194,7 @@ namespace System.Threading
                 }
                 else
                 {
-                    minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs;
+                    minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;
                 }
 
                 return delay > minimumDelay;
index d4673f5..43aa0fc 100644 (file)
@@ -16,14 +16,15 @@ namespace System.Threading
             // SOS's ThreadPool command depends on this layout
             private const byte NumProcessingWorkShift = 0;
             private const byte NumExistingThreadsShift = 16;
+            private const byte NumThreadsGoalShift = 32;
 
-            private uint _data; // SOS's ThreadPool command depends on this name
+            private ulong _data; // SOS's ThreadPool command depends on this name
 
-            private ThreadCounts(uint data) => _data = data;
+            private ThreadCounts(ulong data) => _data = data;
 
             private short GetInt16Value(byte shift) => (short)(_data >> shift);
             private void SetInt16Value(short value, byte shift) =>
-                _data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift);
+                _data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift);
 
             /// <summary>
             /// Number of threads processing work items.
@@ -43,7 +44,7 @@ namespace System.Threading
                 Debug.Assert(value >= 0);
                 Debug.Assert(value <= NumProcessingWork);
 
-                _data -= (uint)(ushort)value << NumProcessingWorkShift;
+                _data -= (ulong)(ushort)value << NumProcessingWorkShift;
             }
 
             public void InterlockedDecrementNumProcessingWork()
@@ -72,19 +73,61 @@ namespace System.Threading
                 Debug.Assert(value >= 0);
                 Debug.Assert(value <= NumExistingThreads);
 
-                _data -= (uint)(ushort)value << NumExistingThreadsShift;
+                _data -= (ulong)(ushort)value << NumExistingThreadsShift;
+            }
+
+            /// <summary>
+            /// Max possible thread pool threads we want to have.
+            /// </summary>
+            public short NumThreadsGoal
+            {
+                get => GetInt16Value(NumThreadsGoalShift);
+                set
+                {
+                    Debug.Assert(value > 0);
+                    SetInt16Value(value, NumThreadsGoalShift);
+                }
+            }
+
+            public ThreadCounts InterlockedSetNumThreadsGoal(short value)
+            {
+                ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
+
+                ThreadCounts counts = this;
+                while (true)
+                {
+                    ThreadCounts newCounts = counts;
+                    newCounts.NumThreadsGoal = value;
+
+                    ThreadCounts countsBeforeUpdate = InterlockedCompareExchange(newCounts, counts);
+                    if (countsBeforeUpdate == counts)
+                    {
+                        return newCounts;
+                    }
+
+                    counts = countsBeforeUpdate;
+                }
             }
 
             public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data));
 
-            public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) =>
-                new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
+            public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts)
+            {
+#if DEBUG
+                if (newCounts.NumThreadsGoal != oldCounts.NumThreadsGoal)
+                {
+                    ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
+                }
+#endif
+
+                return new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
+            }
 
             public static bool operator ==(ThreadCounts lhs, ThreadCounts rhs) => lhs._data == rhs._data;
             public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data;
 
             public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data;
-            public override int GetHashCode() => (int)_data;
+            public override int GetHashCode() => (int)_data + (int)(_data >> 32);
         }
     }
 }
index 8b1a449..1298d1b 100644 (file)
@@ -124,22 +124,19 @@ namespace System.Threading
                             ThreadCounts newCounts = counts;
                             newCounts.SubtractNumExistingThreads(1);
                             short newNumExistingThreads = (short)(numExistingThreads - 1);
-
-                            ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
+                            short newNumThreadsGoal =
+                                Math.Max(
+                                    threadPoolInstance.MinThreadsGoal,
+                                    Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
+                            newCounts.NumThreadsGoal = newNumThreadsGoal;
+
+                            ThreadCounts oldCounts =
+                                threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
                             if (oldCounts == counts)
                             {
-                                short newNumThreadsGoal =
-                                    Math.Max(
-                                        threadPoolInstance.MinThreadsGoal,
-                                        Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal));
-                                if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal)
-                                {
-                                    threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
-                                    HillClimbing.ThreadPoolHillClimber.ForceChange(
-                                        newNumThreadsGoal,
-                                        HillClimbing.StateOrTransition.ThreadTimedOut);
-                                }
-
+                                HillClimbing.ThreadPoolHillClimber.ForceChange(
+                                    newNumThreadsGoal,
+                                    HillClimbing.StateOrTransition.ThreadTimedOut);
                                 if (NativeRuntimeEventSource.Log.IsEnabled())
                                 {
                                     NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
@@ -181,7 +178,7 @@ namespace System.Threading
                 while (true)
                 {
                     numProcessingWork = counts.NumProcessingWork;
-                    if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
+                    if (numProcessingWork >= counts.NumThreadsGoal)
                     {
                         return;
                     }
@@ -256,7 +253,7 @@ namespace System.Threading
                     // code from which this implementation was ported, which turns a processing thread into a retired thread
                     // and checks for pending requests like RemoveWorkingWorker. In this implementation there are
                     // no retired threads, so only the count of threads processing work is considered.
-                    if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal)
+                    if (counts.NumProcessingWork <= counts.NumThreadsGoal)
                     {
                         return false;
                     }
index 8c6163f..a425298 100644 (file)
@@ -51,8 +51,6 @@ namespace System.Threading
         {
             [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)]
             public ThreadCounts counts; // SOS's ThreadPool command depends on this name
-            [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))]
-            public short numThreadsGoal;
 
             [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)]
             public int lastDequeueTime;
@@ -103,7 +101,7 @@ namespace System.Threading
                 _maxThreads = _minThreads;
             }
 
-            _separated.numThreadsGoal = _minThreads;
+            _separated.counts.NumThreadsGoal = _minThreads;
         }
 
         public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
@@ -142,9 +140,9 @@ namespace System.Threading
                         wakeGateThread = true;
                     }
                 }
-                else if (_separated.numThreadsGoal < newMinThreads)
+                else if (_separated.counts.NumThreadsGoal < newMinThreads)
                 {
-                    _separated.numThreadsGoal = newMinThreads;
+                    _separated.counts.InterlockedSetNumThreadsGoal(newMinThreads);
                     if (_separated.numRequestedWorkers > 0)
                     {
                         addWorker = true;
@@ -193,9 +191,9 @@ namespace System.Threading
 
                 short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount);
                 _maxThreads = newMaxThreads;
-                if (_separated.numThreadsGoal > newMaxThreads)
+                if (_separated.counts.NumThreadsGoal > newMaxThreads)
                 {
-                    _separated.numThreadsGoal = newMaxThreads;
+                    _separated.counts.InterlockedSetNumThreadsGoal(newMaxThreads);
                 }
                 return true;
             }
@@ -272,13 +270,15 @@ namespace System.Threading
             bool addWorker = false;
             try
             {
-                // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the
-                // blocking adjustment heuristics and increase the thread count too quickly.
-                if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
+                // Repeated checks from ShouldAdjustMaxWorkersActive() inside the lock
+                ThreadCounts counts = _separated.counts;
+                if (counts.NumProcessingWork > counts.NumThreadsGoal ||
+                    _pendingBlockingAdjustment != PendingBlockingAdjustment.None)
                 {
                     return;
                 }
 
+
                 long startTime = _currentSampleStartTime;
                 long endTime = Stopwatch.GetTimestamp();
                 long freq = Stopwatch.Frequency;
@@ -291,13 +291,13 @@ namespace System.Threading
                     int totalNumCompletions = (int)_completionCounter.Count;
                     int numCompletions = totalNumCompletions - _separated.priorCompletionCount;
 
+                    short oldNumThreadsGoal = counts.NumThreadsGoal;
                     int newNumThreadsGoal;
                     (newNumThreadsGoal, _threadAdjustmentIntervalMs) =
-                        HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions);
-                    short oldNumThreadsGoal = _separated.numThreadsGoal;
+                        HillClimbing.ThreadPoolHillClimber.Update(oldNumThreadsGoal, elapsedSeconds, numCompletions);
                     if (oldNumThreadsGoal != (short)newNumThreadsGoal)
                     {
-                        _separated.numThreadsGoal = (short)newNumThreadsGoal;
+                        _separated.counts.InterlockedSetNumThreadsGoal((short)newNumThreadsGoal);
 
                         //
                         // If we're increasing the goal, inject a thread.  If that thread finds work, it will inject
@@ -354,7 +354,8 @@ namespace System.Threading
             // threads processing work to stop in response to a decreased thread count goal. The logic here is a bit
             // different from the original CoreCLR code from which this implementation was ported because in this
             // implementation there are no retired threads, so only the count of threads processing work is considered.
-            if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal)
+            ThreadCounts counts = _separated.counts;
+            if (counts.NumProcessingWork > counts.NumThreadsGoal)
             {
                 return false;
             }
index 767e5be..89c8b54 100644 (file)
@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.Linq;
 using System.Reflection;
 using System.Threading.Tasks;
@@ -949,6 +950,57 @@ namespace System.Threading.ThreadPools.Tests
             }).Dispose();
         }
 
+        [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
+        public static void CooperativeBlockingWithProcessingThreadsAndGoalThreadsAndAddWorkerRaceTest()
+        {
+            // Avoid contaminating the main process' environment
+            RemoteExecutor.Invoke(() =>
+            {
+                try
+                {
+                    // The test is run affinitized to at most 2 processors for more frequent repros. The actual test process below
+                    // will inherit the affinity.
+                    Process testParentProcess = Process.GetCurrentProcess();
+                    testParentProcess.ProcessorAffinity = (nint)testParentProcess.ProcessorAffinity & 0x3;
+                }
+                catch (PlatformNotSupportedException)
+                {
+                    // Processor affinity is not supported on some platforms, try to run the test anyway
+                }
+
+                RemoteExecutor.Invoke(() =>
+                {
+                    const uint TestDurationMs = 4000;
+
+                    var done = new ManualResetEvent(false);
+                    int startTimeMs = Environment.TickCount;
+                    Action<object> completingTask = data => ((TaskCompletionSource<int>)data).SetResult(0);
+                    Action repeatingTask = null;
+                    repeatingTask = () =>
+                    {
+                        if ((uint)(Environment.TickCount - startTimeMs) >= TestDurationMs)
+                        {
+                            done.Set();
+                            return;
+                        }
+
+                        Task.Run(repeatingTask);
+
+                        var tcs = new TaskCompletionSource<int>();
+                        Task.Factory.StartNew(completingTask, tcs);
+                        tcs.Task.Wait();
+                    };
+
+                    for (int i = 0; i < Environment.ProcessorCount; ++i)
+                    {
+                        Task.Run(repeatingTask);
+                    }
+
+                    done.CheckedWait();
+                }).Dispose();
+            }).Dispose();
+        }
+
         public static bool IsThreadingAndRemoteExecutorSupported =>
             PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;
     }