using System.Runtime.CompilerServices;
using System.Runtime.ConstrainedExecution;
using System.Runtime.InteropServices;
+ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
}
}
- internal class QueueSegment
- {
- // Holds a segment of the queue. Enqueues/Dequeues start at element 0, and work their way up.
- internal readonly IThreadPoolWorkItem[] nodes;
- private const int QueueSegmentLength = 256;
-
- // Holds the indexes of the lowest and highest valid elements of the nodes array.
- // The low index is in the lower 16 bits, high index is in the upper 16 bits.
- // Use GetIndexes and CompareExchangeIndexes to manipulate this.
- private volatile int indexes;
-
- // The next segment in the queue.
- public volatile QueueSegment Next;
-
-
- const int SixteenBits = 0xffff;
-
- void GetIndexes(out int upper, out int lower)
- {
- int i = indexes;
- upper = (i >> 16) & SixteenBits;
- lower = i & SixteenBits;
-
- Debug.Assert(upper >= lower);
- Debug.Assert(upper <= nodes.Length);
- Debug.Assert(lower <= nodes.Length);
- Debug.Assert(upper >= 0);
- Debug.Assert(lower >= 0);
- }
-
- bool CompareExchangeIndexes(ref int prevUpper, int newUpper, ref int prevLower, int newLower)
- {
- Debug.Assert(newUpper >= newLower);
- Debug.Assert(newUpper <= nodes.Length);
- Debug.Assert(newLower <= nodes.Length);
- Debug.Assert(newUpper >= 0);
- Debug.Assert(newLower >= 0);
- Debug.Assert(newUpper >= prevUpper);
- Debug.Assert(newLower >= prevLower);
- Debug.Assert(newUpper == prevUpper ^ newLower == prevLower);
-
- int oldIndexes = (prevUpper << 16) | (prevLower & SixteenBits);
- int newIndexes = (newUpper << 16) | (newLower & SixteenBits);
- int prevIndexes = Interlocked.CompareExchange(ref indexes, newIndexes, oldIndexes);
- prevUpper = (prevIndexes >> 16) & SixteenBits;
- prevLower = prevIndexes & SixteenBits;
- return prevIndexes == oldIndexes;
- }
-
- [ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
- public QueueSegment()
- {
- Debug.Assert(QueueSegmentLength <= SixteenBits);
- nodes = new IThreadPoolWorkItem[QueueSegmentLength];
- }
-
-
- public bool IsUsedUp()
- {
- int upper, lower;
- GetIndexes(out upper, out lower);
- return (upper == nodes.Length) &&
- (lower == nodes.Length);
- }
-
- public bool TryEnqueue(IThreadPoolWorkItem node)
- {
- //
- // If there's room in this segment, atomically increment the upper count (to reserve
- // space for this node), then store the node.
- // Note that this leaves a window where it will look like there is data in that
- // array slot, but it hasn't been written yet. This is taken care of in TryDequeue
- // with a busy-wait loop, waiting for the element to become non-null. This implies
- // that we can never store null nodes in this data structure.
- //
- Debug.Assert(null != node);
-
- int upper, lower;
- GetIndexes(out upper, out lower);
-
- while (true)
- {
- if (upper == nodes.Length)
- return false;
-
- if (CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower))
- {
- Debug.Assert(Volatile.Read(ref nodes[upper]) == null);
- Volatile.Write(ref nodes[upper], node);
- return true;
- }
- }
- }
-
- [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
- public bool TryDequeue(out IThreadPoolWorkItem node)
- {
- //
- // If there are nodes in this segment, increment the lower count, then take the
- // element we find there.
- //
- int upper, lower;
- GetIndexes(out upper, out lower);
-
- while(true)
- {
- if (lower == upper)
- {
- node = null;
- return false;
- }
-
- if (CompareExchangeIndexes(ref upper, upper, ref lower, lower + 1))
- {
- // It's possible that a concurrent call to Enqueue hasn't yet
- // written the node reference to the array. We need to spin until
- // it shows up.
- SpinWait spinner = new SpinWait();
- while ((node = Volatile.Read(ref nodes[lower])) == null)
- spinner.SpinOnce();
-
- // Null-out the reference so the object can be GC'd earlier.
- nodes[lower] = null;
-
- return true;
- }
- }
- }
- }
-
- // The head and tail of the queue. We enqueue to the head, and dequeue from the tail.
- internal volatile QueueSegment queueHead;
- internal volatile QueueSegment queueTail;
internal bool loggingEnabled;
+ internal readonly ConcurrentQueue<IThreadPoolWorkItem> workItems = new ConcurrentQueue<IThreadPoolWorkItem>();
internal static readonly SparseArray<WorkStealingQueue> allThreadQueues = new SparseArray<WorkStealingQueue>(16);
public ThreadPoolWorkQueue()
{
- queueTail = queueHead = new QueueSegment();
loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool|FrameworkEventSource.Keywords.ThreadTransfer);
}
}
else
{
- QueueSegment head = queueHead;
-
- while (!head.TryEnqueue(callback))
- {
- Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null);
-
- while (head.Next != null)
- {
- Interlocked.CompareExchange(ref queueHead, head.Next, head);
- head = queueHead;
- }
- }
+ workItems.Enqueue(callback);
}
EnsureThreadRequested();
if (null == callback)
{
- QueueSegment tail = queueTail;
- while (true)
+ if (workItems.TryDequeue(out callback))
{
- if (tail.TryDequeue(out callback))
- {
- Debug.Assert(null != callback);
- break;
- }
-
- if (null == tail.Next || !tail.IsUsedUp())
- {
- break;
- }
- else
- {
- Interlocked.CompareExchange(ref queueTail, tail.Next, tail);
- tail = queueTail;
- }
+ Debug.Assert(null != callback);
}
}
// Get all workitems. Called by TaskScheduler in its debugger hooks.
internal static IEnumerable<IThreadPoolWorkItem> GetQueuedWorkItems()
{
- return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail);
- }
-
- internal static IEnumerable<IThreadPoolWorkItem> EnumerateQueuedWorkItems(ThreadPoolWorkQueue.WorkStealingQueue[] wsQueues, ThreadPoolWorkQueue.QueueSegment globalQueueTail)
- {
- if (wsQueues != null)
+ // Enumerate global queue
+ foreach (IThreadPoolWorkItem workItem in ThreadPoolGlobals.workQueue.workItems)
{
- // First, enumerate all workitems in thread-local queues.
- foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in wsQueues)
- {
- if (wsq != null && wsq.m_array != null)
- {
- IThreadPoolWorkItem[] items = wsq.m_array;
- for (int i = 0; i < items.Length; i++)
- {
- IThreadPoolWorkItem item = items[i];
- if (item != null)
- yield return item;
- }
- }
- }
+ yield return workItem;
}
- if (globalQueueTail != null)
+ // Enumerate each local queue
+ foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.allThreadQueues.Current)
{
- // Now the global queue
- for (ThreadPoolWorkQueue.QueueSegment segment = globalQueueTail;
- segment != null;
- segment = segment.Next)
+ if (wsq != null && wsq.m_array != null)
{
- IThreadPoolWorkItem[] items = segment.nodes;
+ IThreadPoolWorkItem[] items = wsq.m_array;
for (int i = 0; i < items.Length; i++)
{
IThreadPoolWorkItem item = items[i];
if (item != null)
+ {
yield return item;
+ }
}
}
}
internal static IEnumerable<IThreadPoolWorkItem> GetLocallyQueuedWorkItems()
{
- return EnumerateQueuedWorkItems(new ThreadPoolWorkQueue.WorkStealingQueue[] { ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue }, null);
+ ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue;
+ if (wsq != null && wsq.m_array != null)
+ {
+ IThreadPoolWorkItem[] items = wsq.m_array;
+ for (int i = 0; i < items.Length; i++)
+ {
+ IThreadPoolWorkItem item = items[i];
+ if (item != null)
+ yield return item;
+ }
+ }
}
- internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems()
- {
- return EnumerateQueuedWorkItems(null, ThreadPoolGlobals.workQueue.queueTail);
- }
+ internal static IEnumerable<IThreadPoolWorkItem> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems;
private static object[] ToObjectArray(IEnumerable<IThreadPoolWorkItem> workitems)
{