Revert two changes to thread requests (#14015)
authorKoundinya Veluri <kouvel@users.noreply.github.com>
Fri, 15 Sep 2017 21:15:25 +0000 (14:15 -0700)
committerGitHub <noreply@github.com>
Fri, 15 Sep 2017 21:15:25 +0000 (14:15 -0700)
Reverting 99db31c41d5057e08cc4701c79f11246b9191a9b and fd91ee1fa23f35130f576c19dfaf35934dc2ce24 to unblock others while trying to figure out what the issues are and how to fix them.

fd91ee1fa23f35130f576c19dfaf35934dc2ce24 is causing @benaadams thread pool perf test (https://github.com/benaadams/ThreadPoolTaskTesting) to hang due to a missed thread request. Somehow wsqActive is ending up at zero while there is a work item in the queue and with no pending thread requests. I don't understand how yet.

99db31c41d5057e08cc4701c79f11246b9191a9b appears to have a potential issue because the order of MarkThreadRequestSatisfied and Dequeue are reversed. For instance, assuming a proc count of 1:
- Initial state: 1 work item enqueued, 1 thread request
- T1 Dispatch: dequeues a work item and requests a thread (0 work items, 1 thread request)
- T1 Dispatch: sees no more work items, returns
- T1 calls Dispatch again due to its own thread request
- T1 Dispatch: After Dequeue (which saw 0 work items) and before MarkThreadRequestSatisfied:
  - Current state: 0 work items, 1 thread request
  - T2 enqueues a work item, sees 1 thread request and does not request a thread (1 work item, 1 thread request)
- T1 Dispatch: MarkThreadRequestSatisfied decrements thread requests (1 work item, 0 thread requests)
- Now after T1 returns, it won't wake up again but there is still one work item in the queue

src/mscorlib/src/System/Threading/ThreadPool.cs

index ec9ceef..fa1dd09 100644 (file)
@@ -48,12 +48,6 @@ namespace System.Threading
 
             public static WorkStealingQueue[] Queues => _queues;
 
-            // Track whether the WorkStealingQueueList is empty
-            // Three states simplifies race conditions.  They may be considered.
-            // Now Active --> Maybe Inactive -> Confirmed Inactive
-            public const int WsqNowActive = 2;
-            public static int wsqActive;
-
             public static void Add(WorkStealingQueue queue)
             {
                 Debug.Assert(queue != null);
@@ -400,9 +394,6 @@ namespace System.Threading
             ThreadPoolWorkQueueThreadLocals.threadLocals ??
             (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));
 
-        internal bool ThreadRequestNeeded(int count) => (count < ThreadPoolGlobals.processorCount) &&
-            (!workItems.IsEmpty || (WorkStealingQueueList.wsqActive > 0));
-
         internal void EnsureThreadRequested()
         {
             //
@@ -413,7 +404,7 @@ namespace System.Threading
             // which is handled by RequestWorkerThread.
             //
             int count = numOutstandingThreadRequests;
-            while (ThreadRequestNeeded(count))
+            while (count < ThreadPoolGlobals.processorCount)
             {
                 int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
                 if (prev == count)
@@ -425,7 +416,7 @@ namespace System.Threading
             }
         }
 
-        internal void MarkThreadRequestSatisfied(bool dequeueSuccessful)
+        internal void MarkThreadRequestSatisfied()
         {
             //
             // The VM has called us, so one of our outstanding thread requests has been satisfied.
@@ -434,17 +425,8 @@ namespace System.Threading
             // by the time we reach this point.
             //
             int count = numOutstandingThreadRequests;
-
             while (count > 0)
             {
-                if (dequeueSuccessful && (count == ThreadPoolGlobals.processorCount) && ThreadRequestNeeded(count - 1))
-                {
-                    // If we gated threads due to too many outstanding requests and queue was not empty
-                    // Request another thread.
-                    ThreadPool.RequestWorkerThread();
-                    return;
-                }
-
                 int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
                 if (prev == count)
                 {
@@ -466,18 +448,6 @@ namespace System.Threading
             if (null != tl)
             {
                 tl.workStealingQueue.LocalPush(callback);
-
-                // We must guarantee wsqActive is set to WsqNowActive after we push
-                // The ordering must be global because we rely on other threads
-                // observing in this order
-                Interlocked.MemoryBarrier();
-
-                // We do not want to simply write.  We want to prevent unnecessary writes
-                // which would invalidate reader's caches
-                if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive)
-                {
-                    Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive);
-                }
             }
             else
             {
@@ -495,56 +465,33 @@ namespace System.Threading
 
         public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
         {
+            WorkStealingQueue localWsq = tl.workStealingQueue;
             IThreadPoolWorkItem callback;
-            int wsqActiveObserved = WorkStealingQueueList.wsqActive;
-            if (wsqActiveObserved > 0)
-            {
-                WorkStealingQueue localWsq = tl.workStealingQueue;
 
-                if ((callback = localWsq.LocalPop()) == null && // first try the local queue
-                    !workItems.TryDequeue(out callback)) // then try the global queue
+            if ((callback = localWsq.LocalPop()) == null && // first try the local queue
+                !workItems.TryDequeue(out callback)) // then try the global queue
+            {
+                // finally try to steal from another thread's local queue
+                WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
+                int c = queues.Length;
+                Debug.Assert(c > 0, "There must at least be a queue for this thread.");
+                int maxIndex = c - 1;
+                int i = tl.random.Next(c);
+                while (c > 0)
                 {
-                    // finally try to steal from another thread's local queue
-                    WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
-                    int c = queues.Length;
-                    Debug.Assert(c > 0, "There must at least be a queue for this thread.");
-                    int maxIndex = c - 1;
-                    int i = tl.random.Next(c);
-                    while (c > 0)
+                    i = (i < maxIndex) ? i + 1 : 0;
+                    WorkStealingQueue otherQueue = queues[i];
+                    if (otherQueue != localWsq && otherQueue.CanSteal)
                     {
-                        i = (i < maxIndex) ? i + 1 : 0;
-                        WorkStealingQueue otherQueue = queues[i];
-                        if (otherQueue != localWsq && otherQueue.CanSteal)
+                        callback = otherQueue.TrySteal(ref missedSteal);
+                        if (callback != null)
                         {
-                            callback = otherQueue.TrySteal(ref missedSteal);
-                            if (callback != null)
-                            {
-                                break;
-                            }
+                            break;
                         }
-                        c--;
-                    }
-                    if ((callback == null) && !missedSteal)
-                    {
-                        // Only decrement if the value is unchanged since we started looking for work
-                        // This prevents multiple threads decrementing based on overlapping scans.
-                        //
-                        // When we decrement from active, the producer may have inserted a queue item during our scan
-                        // therefore we cannot transition to empty
-                        //
-                        // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan,
-                        // the producer must write Active.  We may transition to empty briefly if we beat the
-                        // producer's write, but the producer will then overwrite us before waking threads.
-                        // So effectively we cannot mark the queue empty when an item is in the queue.
-                        Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved);
                     }
+                    c--;
                 }
             }
-            else
-            {
-                // We only need to look at the global queue since WorkStealingQueueList is inactive
-                workItems.TryDequeue(out callback);
-            }
 
             return callback;
         }
@@ -558,7 +505,15 @@ namespace System.Threading
             //
             int quantumStartTime = Environment.TickCount;
 
-            bool markThreadRequestSatisfied = true;
+            //
+            // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
+            // From this point on, we are responsible for requesting another thread if we stop working for any
+            // reason, and we believe there might still be work in the queue.
+            //
+            // Note that if this thread is aborted before we get a chance to request another one, the VM will
+            // record a thread request on our behalf.  So we don't need to worry about getting aborted right here.
+            //
+            workQueue.MarkThreadRequestSatisfied();
 
             // Has the desire for logging changed since the last time we entered?
             workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
@@ -607,21 +562,7 @@ namespace System.Threading
                     // If we found work, there may be more work.  Ask for another thread so that the other work can be processed
                     // in parallel.  Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
                     //
-                    if (markThreadRequestSatisfied)
-                    {
-                        //
-                        // Update our records to indicate that an outstanding request for a thread has now been fulfilled
-                        // and that an item was successfully dispatched and another thread may be needed
-                        //
-                        // From this point on, we are responsible for requesting another thread if we stop working for any
-                        // reason, and we believe there might still be work in the queue.
-                        //
-                        // Note that if this thread is aborted before we get a chance to request another one, the VM will
-                        // record a thread request on our behalf.  So we don't need to worry about getting aborted right here.
-                        //
-                        workQueue.MarkThreadRequestSatisfied(true);
-                        markThreadRequestSatisfied = false;
-                    }
+                    workQueue.EnsureThreadRequested();
 
                     //
                     // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
@@ -676,15 +617,6 @@ namespace System.Threading
             }
             finally
             {
-                if (markThreadRequestSatisfied)
-                {
-                    //
-                    // Update our records to indicate that an outstanding request for a thread has now been fulfilled
-                    // and that an item was not successfully dispatched.  We will request thread below if needed
-                    //
-                    workQueue.MarkThreadRequestSatisfied(false);
-                }
-
                 //
                 // If we are exiting for any reason other than that the queue is definitely empty, ask for another
                 // thread to pick up where we left off.