public static WorkStealingQueue[] Queues => _queues;
- // Track whether the WorkStealingQueueList is empty
- // Three states simplifies race conditions. They may be considered.
- // Now Active --> Maybe Inactive -> Confirmed Inactive
- public const int WsqNowActive = 2;
- public static int wsqActive;
-
public static void Add(WorkStealingQueue queue)
{
Debug.Assert(queue != null);
ThreadPoolWorkQueueThreadLocals.threadLocals ??
(ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));
- internal bool ThreadRequestNeeded(int count) => (count < ThreadPoolGlobals.processorCount) &&
- (!workItems.IsEmpty || (WorkStealingQueueList.wsqActive > 0));
-
internal void EnsureThreadRequested()
{
//
// which is handled by RequestWorkerThread.
//
int count = numOutstandingThreadRequests;
- while (ThreadRequestNeeded(count))
+ while (count < ThreadPoolGlobals.processorCount)
{
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
if (prev == count)
}
}
- internal void MarkThreadRequestSatisfied(bool dequeueSuccessful)
+ internal void MarkThreadRequestSatisfied()
{
//
// The VM has called us, so one of our outstanding thread requests has been satisfied.
// by the time we reach this point.
//
int count = numOutstandingThreadRequests;
-
while (count > 0)
{
- if (dequeueSuccessful && (count == ThreadPoolGlobals.processorCount) && ThreadRequestNeeded(count - 1))
- {
- // If we gated threads due to too many outstanding requests and queue was not empty
- // Request another thread.
- ThreadPool.RequestWorkerThread();
- return;
- }
-
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
if (prev == count)
{
if (null != tl)
{
tl.workStealingQueue.LocalPush(callback);
-
- // We must guarantee wsqActive is set to WsqNowActive after we push
- // The ordering must be global because we rely on other threads
- // observing in this order
- Interlocked.MemoryBarrier();
-
- // We do not want to simply write. We want to prevent unnecessary writes
- // which would invalidate reader's caches
- if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive)
- {
- Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive);
- }
}
else
{
public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
{
+ WorkStealingQueue localWsq = tl.workStealingQueue;
IThreadPoolWorkItem callback;
- int wsqActiveObserved = WorkStealingQueueList.wsqActive;
- if (wsqActiveObserved > 0)
- {
- WorkStealingQueue localWsq = tl.workStealingQueue;
- if ((callback = localWsq.LocalPop()) == null && // first try the local queue
- !workItems.TryDequeue(out callback)) // then try the global queue
+ if ((callback = localWsq.LocalPop()) == null && // first try the local queue
+ !workItems.TryDequeue(out callback)) // then try the global queue
+ {
+ // finally try to steal from another thread's local queue
+ WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
+ int c = queues.Length;
+ Debug.Assert(c > 0, "There must at least be a queue for this thread.");
+ int maxIndex = c - 1;
+ int i = tl.random.Next(c);
+ while (c > 0)
{
- // finally try to steal from another thread's local queue
- WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
- int c = queues.Length;
- Debug.Assert(c > 0, "There must at least be a queue for this thread.");
- int maxIndex = c - 1;
- int i = tl.random.Next(c);
- while (c > 0)
+ i = (i < maxIndex) ? i + 1 : 0;
+ WorkStealingQueue otherQueue = queues[i];
+ if (otherQueue != localWsq && otherQueue.CanSteal)
{
- i = (i < maxIndex) ? i + 1 : 0;
- WorkStealingQueue otherQueue = queues[i];
- if (otherQueue != localWsq && otherQueue.CanSteal)
+ callback = otherQueue.TrySteal(ref missedSteal);
+ if (callback != null)
{
- callback = otherQueue.TrySteal(ref missedSteal);
- if (callback != null)
- {
- break;
- }
+ break;
}
- c--;
- }
- if ((callback == null) && !missedSteal)
- {
- // Only decrement if the value is unchanged since we started looking for work
- // This prevents multiple threads decrementing based on overlapping scans.
- //
- // When we decrement from active, the producer may have inserted a queue item during our scan
- // therefore we cannot transition to empty
- //
- // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan,
- // the producer must write Active. We may transition to empty briefly if we beat the
- // producer's write, but the producer will then overwrite us before waking threads.
- // So effectively we cannot mark the queue empty when an item is in the queue.
- Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved);
}
+ c--;
}
}
- else
- {
- // We only need to look at the global queue since WorkStealingQueueList is inactive
- workItems.TryDequeue(out callback);
- }
return callback;
}
//
int quantumStartTime = Environment.TickCount;
- bool markThreadRequestSatisfied = true;
+ //
+ // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
+ // From this point on, we are responsible for requesting another thread if we stop working for any
+ // reason, and we believe there might still be work in the queue.
+ //
+ // Note that if this thread is aborted before we get a chance to request another one, the VM will
+ // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
+ //
+ workQueue.MarkThreadRequestSatisfied();
// Has the desire for logging changed since the last time we entered?
workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
// If we found work, there may be more work. Ask for another thread so that the other work can be processed
// in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
//
- if (markThreadRequestSatisfied)
- {
- //
- // Update our records to indicate that an outstanding request for a thread has now been fulfilled
- // and that an item was successfully dispatched and another thread may be needed
- //
- // From this point on, we are responsible for requesting another thread if we stop working for any
- // reason, and we believe there might still be work in the queue.
- //
- // Note that if this thread is aborted before we get a chance to request another one, the VM will
- // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
- //
- workQueue.MarkThreadRequestSatisfied(true);
- markThreadRequestSatisfied = false;
- }
+ workQueue.EnsureThreadRequested();
//
// Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
}
finally
{
- if (markThreadRequestSatisfied)
- {
- //
- // Update our records to indicate that an outstanding request for a thread has now been fulfilled
- // and that an item was not successfully dispatched. We will request thread below if needed
- //
- workQueue.MarkThreadRequestSatisfied(false);
- }
-
//
// If we are exiting for any reason other than that the queue is definitely empty, ask for another
// thread to pick up where we left off.