From bd8b2362c8650b39acf735a6d09fba8a3fe739e2 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 31 Jan 2017 11:51:40 -0500 Subject: [PATCH] Use ConcurrentQueue in ThreadPool Use the same ConcurrentQueue now being used in corefx in ThreadPool instead of ThreadPool's custom queue implementation (which was close in design to the old ConcurrentQueue implementation). Commit migrated from https://github.com/dotnet/coreclr/commit/cba50e660a5e8073a820e7c55fce705cb5b973f6 --- .../mscorlib/src/System/Threading/ThreadPool.cs | 220 +++------------------ 1 file changed, 26 insertions(+), 194 deletions(-) diff --git a/src/coreclr/src/mscorlib/src/System/Threading/ThreadPool.cs b/src/coreclr/src/mscorlib/src/System/Threading/ThreadPool.cs index 451b15d..ac6986d 100644 --- a/src/coreclr/src/mscorlib/src/System/Threading/ThreadPool.cs +++ b/src/coreclr/src/mscorlib/src/System/Threading/ThreadPool.cs @@ -35,6 +35,7 @@ namespace System.Threading using System.Runtime.CompilerServices; using System.Runtime.ConstrainedExecution; using System.Runtime.InteropServices; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.Contracts; @@ -408,140 +409,8 @@ namespace System.Threading } } - internal class QueueSegment - { - // Holds a segment of the queue. Enqueues/Dequeues start at element 0, and work their way up. - internal readonly IThreadPoolWorkItem[] nodes; - private const int QueueSegmentLength = 256; - - // Holds the indexes of the lowest and highest valid elements of the nodes array. - // The low index is in the lower 16 bits, high index is in the upper 16 bits. - // Use GetIndexes and CompareExchangeIndexes to manipulate this. - private volatile int indexes; - - // The next segment in the queue. - public volatile QueueSegment Next; - - - const int SixteenBits = 0xffff; - - void GetIndexes(out int upper, out int lower) - { - int i = indexes; - upper = (i >> 16) & SixteenBits; - lower = i & SixteenBits; - - Debug.Assert(upper >= lower); - Debug.Assert(upper <= nodes.Length); - Debug.Assert(lower <= nodes.Length); - Debug.Assert(upper >= 0); - Debug.Assert(lower >= 0); - } - - bool CompareExchangeIndexes(ref int prevUpper, int newUpper, ref int prevLower, int newLower) - { - Debug.Assert(newUpper >= newLower); - Debug.Assert(newUpper <= nodes.Length); - Debug.Assert(newLower <= nodes.Length); - Debug.Assert(newUpper >= 0); - Debug.Assert(newLower >= 0); - Debug.Assert(newUpper >= prevUpper); - Debug.Assert(newLower >= prevLower); - Debug.Assert(newUpper == prevUpper ^ newLower == prevLower); - - int oldIndexes = (prevUpper << 16) | (prevLower & SixteenBits); - int newIndexes = (newUpper << 16) | (newLower & SixteenBits); - int prevIndexes = Interlocked.CompareExchange(ref indexes, newIndexes, oldIndexes); - prevUpper = (prevIndexes >> 16) & SixteenBits; - prevLower = prevIndexes & SixteenBits; - return prevIndexes == oldIndexes; - } - - [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)] - public QueueSegment() - { - Debug.Assert(QueueSegmentLength <= SixteenBits); - nodes = new IThreadPoolWorkItem[QueueSegmentLength]; - } - - - public bool IsUsedUp() - { - int upper, lower; - GetIndexes(out upper, out lower); - return (upper == nodes.Length) && - (lower == nodes.Length); - } - - public bool TryEnqueue(IThreadPoolWorkItem node) - { - // - // If there's room in this segment, atomically increment the upper count (to reserve - // space for this node), then store the node. - // Note that this leaves a window where it will look like there is data in that - // array slot, but it hasn't been written yet. This is taken care of in TryDequeue - // with a busy-wait loop, waiting for the element to become non-null. This implies - // that we can never store null nodes in this data structure. - // - Debug.Assert(null != node); - - int upper, lower; - GetIndexes(out upper, out lower); - - while (true) - { - if (upper == nodes.Length) - return false; - - if (CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower)) - { - Debug.Assert(Volatile.Read(ref nodes[upper]) == null); - Volatile.Write(ref nodes[upper], node); - return true; - } - } - } - - [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")] - public bool TryDequeue(out IThreadPoolWorkItem node) - { - // - // If there are nodes in this segment, increment the lower count, then take the - // element we find there. - // - int upper, lower; - GetIndexes(out upper, out lower); - - while(true) - { - if (lower == upper) - { - node = null; - return false; - } - - if (CompareExchangeIndexes(ref upper, upper, ref lower, lower + 1)) - { - // It's possible that a concurrent call to Enqueue hasn't yet - // written the node reference to the array. We need to spin until - // it shows up. - SpinWait spinner = new SpinWait(); - while ((node = Volatile.Read(ref nodes[lower])) == null) - spinner.SpinOnce(); - - // Null-out the reference so the object can be GC'd earlier. - nodes[lower] = null; - - return true; - } - } - } - } - - // The head and tail of the queue. We enqueue to the head, and dequeue from the tail. - internal volatile QueueSegment queueHead; - internal volatile QueueSegment queueTail; internal bool loggingEnabled; + internal readonly ConcurrentQueue workItems = new ConcurrentQueue(); internal static readonly SparseArray allThreadQueues = new SparseArray(16); @@ -549,7 +418,6 @@ namespace System.Threading public ThreadPoolWorkQueue() { - queueTail = queueHead = new QueueSegment(); loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool|FrameworkEventSource.Keywords.ThreadTransfer); } @@ -615,18 +483,7 @@ namespace System.Threading } else { - QueueSegment head = queueHead; - - while (!head.TryEnqueue(callback)) - { - Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null); - - while (head.Next != null) - { - Interlocked.CompareExchange(ref queueHead, head.Next, head); - head = queueHead; - } - } + workItems.Enqueue(callback); } EnsureThreadRequested(); @@ -652,24 +509,9 @@ namespace System.Threading if (null == callback) { - QueueSegment tail = queueTail; - while (true) + if (workItems.TryDequeue(out callback)) { - if (tail.TryDequeue(out callback)) - { - Debug.Assert(null != callback); - break; - } - - if (null == tail.Next || !tail.IsUsedUp()) - { - break; - } - else - { - Interlocked.CompareExchange(ref queueTail, tail.Next, tail); - tail = queueTail; - } + Debug.Assert(null != callback); } } @@ -1632,42 +1474,25 @@ namespace System.Threading // Get all workitems. Called by TaskScheduler in its debugger hooks. internal static IEnumerable GetQueuedWorkItems() { - return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail); - } - - internal static IEnumerable EnumerateQueuedWorkItems(ThreadPoolWorkQueue.WorkStealingQueue[] wsQueues, ThreadPoolWorkQueue.QueueSegment globalQueueTail) - { - if (wsQueues != null) + // Enumerate global queue + foreach (IThreadPoolWorkItem workItem in ThreadPoolGlobals.workQueue.workItems) { - // First, enumerate all workitems in thread-local queues. - foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in wsQueues) - { - if (wsq != null && wsq.m_array != null) - { - IThreadPoolWorkItem[] items = wsq.m_array; - for (int i = 0; i < items.Length; i++) - { - IThreadPoolWorkItem item = items[i]; - if (item != null) - yield return item; - } - } - } + yield return workItem; } - if (globalQueueTail != null) + // Enumerate each local queue + foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.allThreadQueues.Current) { - // Now the global queue - for (ThreadPoolWorkQueue.QueueSegment segment = globalQueueTail; - segment != null; - segment = segment.Next) + if (wsq != null && wsq.m_array != null) { - IThreadPoolWorkItem[] items = segment.nodes; + IThreadPoolWorkItem[] items = wsq.m_array; for (int i = 0; i < items.Length; i++) { IThreadPoolWorkItem item = items[i]; if (item != null) + { yield return item; + } } } } @@ -1675,13 +1500,20 @@ namespace System.Threading internal static IEnumerable GetLocallyQueuedWorkItems() { - return EnumerateQueuedWorkItems(new ThreadPoolWorkQueue.WorkStealingQueue[] { ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue }, null); + ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue; + if (wsq != null && wsq.m_array != null) + { + IThreadPoolWorkItem[] items = wsq.m_array; + for (int i = 0; i < items.Length; i++) + { + IThreadPoolWorkItem item = items[i]; + if (item != null) + yield return item; + } + } } - internal static IEnumerable GetGloballyQueuedWorkItems() - { - return EnumerateQueuedWorkItems(null, ThreadPoolGlobals.workQueue.queueTail); - } + internal static IEnumerable GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems; private static object[] ToObjectArray(IEnumerable workitems) { -- 2.7.4