Use ConcurrentQueue<T> in ThreadPool
authorStephen Toub <stoub@microsoft.com>
Tue, 31 Jan 2017 16:51:40 +0000 (11:51 -0500)
committerStephen Toub <stoub@microsoft.com>
Wed, 1 Feb 2017 14:59:58 +0000 (09:59 -0500)
Use the same ConcurrentQueue<T> now being used in corefx in ThreadPool instead of ThreadPool's custom queue implementation (which was close in design to the old ConcurrentQueue<T> implementation).

Commit migrated from https://github.com/dotnet/coreclr/commit/cba50e660a5e8073a820e7c55fce705cb5b973f6

src/coreclr/src/mscorlib/src/System/Threading/ThreadPool.cs

index 451b15d..ac6986d 100644 (file)
@@ -35,6 +35,7 @@ namespace System.Threading
     using System.Runtime.CompilerServices;
     using System.Runtime.ConstrainedExecution;
     using System.Runtime.InteropServices;
+    using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Diagnostics;
     using System.Diagnostics.Contracts;
@@ -408,140 +409,8 @@ namespace System.Threading
             }
         }
 
-        internal class QueueSegment
-        {
-            // Holds a segment of the queue.  Enqueues/Dequeues start at element 0, and work their way up.
-            internal readonly IThreadPoolWorkItem[] nodes;
-            private const int QueueSegmentLength = 256;
-
-            // Holds the indexes of the lowest and highest valid elements of the nodes array.
-            // The low index is in the lower 16 bits, high index is in the upper 16 bits.
-            // Use GetIndexes and CompareExchangeIndexes to manipulate this.
-            private volatile int indexes;
-
-            // The next segment in the queue.
-            public volatile QueueSegment Next;
-
-
-            const int SixteenBits = 0xffff;
-
-            void GetIndexes(out int upper, out int lower)
-            {
-                int i = indexes;
-                upper = (i >> 16) & SixteenBits;
-                lower = i & SixteenBits;
-
-                Debug.Assert(upper >= lower);
-                Debug.Assert(upper <= nodes.Length);
-                Debug.Assert(lower <= nodes.Length);
-                Debug.Assert(upper >= 0);
-                Debug.Assert(lower >= 0);
-            }
-
-            bool CompareExchangeIndexes(ref int prevUpper, int newUpper, ref int prevLower, int newLower)
-            {
-                Debug.Assert(newUpper >= newLower);
-                Debug.Assert(newUpper <= nodes.Length);
-                Debug.Assert(newLower <= nodes.Length);
-                Debug.Assert(newUpper >= 0);
-                Debug.Assert(newLower >= 0);
-                Debug.Assert(newUpper >= prevUpper);
-                Debug.Assert(newLower >= prevLower);
-                Debug.Assert(newUpper == prevUpper ^ newLower == prevLower);
-
-                int oldIndexes = (prevUpper << 16) | (prevLower & SixteenBits);
-                int newIndexes = (newUpper << 16) | (newLower & SixteenBits);
-                int prevIndexes = Interlocked.CompareExchange(ref indexes, newIndexes, oldIndexes);
-                prevUpper = (prevIndexes >> 16) & SixteenBits;
-                prevLower = prevIndexes & SixteenBits;
-                return prevIndexes == oldIndexes;
-            }
-
-            [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
-            public QueueSegment()
-            {
-                Debug.Assert(QueueSegmentLength <= SixteenBits);
-                nodes = new IThreadPoolWorkItem[QueueSegmentLength];
-            }
-
-
-            public bool IsUsedUp()
-            {
-                int upper, lower;
-                GetIndexes(out upper, out lower);
-                return (upper == nodes.Length) && 
-                       (lower == nodes.Length);
-            }
-
-            public bool TryEnqueue(IThreadPoolWorkItem node)
-            {
-                //
-                // If there's room in this segment, atomically increment the upper count (to reserve
-                // space for this node), then store the node.
-                // Note that this leaves a window where it will look like there is data in that
-                // array slot, but it hasn't been written yet.  This is taken care of in TryDequeue
-                // with a busy-wait loop, waiting for the element to become non-null.  This implies
-                // that we can never store null nodes in this data structure.
-                //
-                Debug.Assert(null != node);
-
-                int upper, lower;
-                GetIndexes(out upper, out lower);
-
-                while (true)
-                {
-                    if (upper == nodes.Length)
-                        return false;
-
-                    if (CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower))
-                    {
-                        Debug.Assert(Volatile.Read(ref nodes[upper]) == null);
-                        Volatile.Write(ref nodes[upper], node);
-                        return true;
-                    }
-                }
-            }
-
-            [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
-            public bool TryDequeue(out IThreadPoolWorkItem node)
-            {
-                //
-                // If there are nodes in this segment, increment the lower count, then take the
-                // element we find there.
-                //
-                int upper, lower;
-                GetIndexes(out upper, out lower);
-
-                while(true)
-                {
-                    if (lower == upper)
-                    {
-                        node = null;
-                        return false;
-                    }
-
-                    if (CompareExchangeIndexes(ref upper, upper, ref lower, lower + 1))
-                    {
-                        // It's possible that a concurrent call to Enqueue hasn't yet
-                        // written the node reference to the array.  We need to spin until
-                        // it shows up.
-                        SpinWait spinner = new SpinWait();
-                        while ((node = Volatile.Read(ref nodes[lower])) == null)
-                            spinner.SpinOnce();
-
-                        // Null-out the reference so the object can be GC'd earlier.
-                        nodes[lower] = null;
-
-                        return true;
-                    }
-                }
-            }
-        }
-
-        // The head and tail of the queue.  We enqueue to the head, and dequeue from the tail.
-        internal volatile QueueSegment queueHead;
-        internal volatile QueueSegment queueTail;
         internal bool loggingEnabled;
+        internal readonly ConcurrentQueue<IThreadPoolWorkItem> workItems = new ConcurrentQueue<IThreadPoolWorkItem>();
 
         internal static readonly SparseArray<WorkStealingQueue> allThreadQueues = new SparseArray<WorkStealingQueue>(16);
 
@@ -549,7 +418,6 @@ namespace System.Threading
       
         public ThreadPoolWorkQueue()
         {
-            queueTail = queueHead = new QueueSegment();
             loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool|FrameworkEventSource.Keywords.ThreadTransfer);
         }
 
@@ -615,18 +483,7 @@ namespace System.Threading
             }
             else
             {
-                QueueSegment head = queueHead;
-
-                while (!head.TryEnqueue(callback))
-                {
-                    Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null);
-
-                    while (head.Next != null)
-                    {
-                        Interlocked.CompareExchange(ref queueHead, head.Next, head);
-                        head = queueHead;
-                    }
-                }
+                workItems.Enqueue(callback);
             }
 
             EnsureThreadRequested();
@@ -652,24 +509,9 @@ namespace System.Threading
 
             if (null == callback)
             {
-                QueueSegment tail = queueTail;
-                while (true)
+                if (workItems.TryDequeue(out callback))
                 {
-                    if (tail.TryDequeue(out callback))
-                    {
-                        Debug.Assert(null != callback);
-                        break;
-                    }
-
-                    if (null == tail.Next || !tail.IsUsedUp())
-                    {
-                        break;
-                    }
-                    else
-                    {
-                        Interlocked.CompareExchange(ref queueTail, tail.Next, tail);
-                        tail = queueTail;
-                    }
+                    Debug.Assert(null != callback);
                 }
             }
 
@@ -1632,42 +1474,25 @@ namespace System.Threading
         // Get all workitems.  Called by TaskScheduler in its debugger hooks.
         internal static IEnumerable<IThreadPoolWorkItem> GetQueuedWorkItems()
         {
-            return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail);
-        }
-
-        internal static IEnumerable<IThreadPoolWorkItem> EnumerateQueuedWorkItems(ThreadPoolWorkQueue.WorkStealingQueue[] wsQueues, ThreadPoolWorkQueue.QueueSegment globalQueueTail)
-        {
-            if (wsQueues != null)
+            // Enumerate global queue
+            foreach (IThreadPoolWorkItem workItem in ThreadPoolGlobals.workQueue.workItems)
             {
-                // First, enumerate all workitems in thread-local queues.
-                foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in wsQueues)
-                {
-                    if (wsq != null && wsq.m_array != null)
-                    {
-                        IThreadPoolWorkItem[] items = wsq.m_array;
-                        for (int i = 0; i < items.Length; i++)
-                        {
-                            IThreadPoolWorkItem item = items[i];
-                            if (item != null)
-                                yield return item;
-                        }
-                    }
-                }
+                yield return workItem;
             }
 
-            if (globalQueueTail != null)
+            // Enumerate each local queue
+            foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.allThreadQueues.Current)
             {
-                // Now the global queue
-                for (ThreadPoolWorkQueue.QueueSegment segment = globalQueueTail;
-                    segment != null;
-                    segment = segment.Next)
+                if (wsq != null && wsq.m_array != null)
                 {
-                    IThreadPoolWorkItem[] items = segment.nodes;
+                    IThreadPoolWorkItem[] items = wsq.m_array;
                     for (int i = 0; i < items.Length; i++)
                     {
                         IThreadPoolWorkItem item = items[i];
                         if (item != null)
+                        {
                             yield return item;
+                        }
                     }
                 }
             }
@@ -1675,13 +1500,20 @@ namespace System.Threading
 
         internal static IEnumerable<IThreadPoolWorkItem> GetLocallyQueuedWorkItems()
         {
-            return EnumerateQueuedWorkItems(new ThreadPoolWorkQueue.WorkStealingQueue[] { ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue }, null);
+            ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue;
+            if (wsq != null && wsq.m_array != null)
+            {
+                IThreadPoolWorkItem[] items = wsq.m_array;
+                for (int i = 0; i < items.Length; i++)
+                {
+                    IThreadPoolWorkItem item = items[i];
+                    if (item != null)
+                        yield return item;
+                }
+            }
         }
 
-        internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems()
-        {
-            return EnumerateQueuedWorkItems(null, ThreadPoolGlobals.workQueue.queueTail);
-        }
+        internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems;
 
         private static object[] ToObjectArray(IEnumerable<IThreadPoolWorkItem> workitems)
         {