public static WorkStealingQueue[] Queues => _queues;
+ // Track whether the WorkStealingQueueList is empty
+ // Three states simplifies race conditions. They may be considered.
+ // Now Active --> Maybe Inactive -> Confirmed Inactive
+ public const int WsqNowActive = 2;
+ public static int wsqActive;
+
public static void Add(WorkStealingQueue queue)
{
Debug.Assert(queue != null);
if (null != tl)
{
tl.workStealingQueue.LocalPush(callback);
+
+ // We must guarantee wsqActive is set to WsqNowActive after we push
+ // The ordering must be global because we rely on other threads
+ // observing in this order
+ Interlocked.MemoryBarrier();
+
+ // We do not want to simply write. We want to prevent unnecessary writes
+ // which would invalidate reader's caches
+ if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive)
+ {
+ Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive);
+ }
}
else
{
public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
{
- WorkStealingQueue localWsq = tl.workStealingQueue;
IThreadPoolWorkItem callback;
-
- if ((callback = localWsq.LocalPop()) == null && // first try the local queue
- !workItems.TryDequeue(out callback)) // then try the global queue
+ int wsqActiveObserved = WorkStealingQueueList.wsqActive;
+ if (wsqActiveObserved > 0)
{
- // finally try to steal from another thread's local queue
- WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
- int c = queues.Length;
- Debug.Assert(c > 0, "There must at least be a queue for this thread.");
- int maxIndex = c - 1;
- int i = tl.random.Next(c);
- while (c > 0)
+ WorkStealingQueue localWsq = tl.workStealingQueue;
+
+ if ((callback = localWsq.LocalPop()) == null && // first try the local queue
+ !workItems.TryDequeue(out callback)) // then try the global queue
{
- i = (i < maxIndex) ? i + 1 : 0;
- WorkStealingQueue otherQueue = queues[i];
- if (otherQueue != localWsq && otherQueue.CanSteal)
+ // finally try to steal from another thread's local queue
+ WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
+ int c = queues.Length;
+ Debug.Assert(c > 0, "There must at least be a queue for this thread.");
+ int maxIndex = c - 1;
+ int i = tl.random.Next(c);
+ while (c > 0)
{
- callback = otherQueue.TrySteal(ref missedSteal);
- if (callback != null)
+ i = (i < maxIndex) ? i + 1 : 0;
+ WorkStealingQueue otherQueue = queues[i];
+ if (otherQueue != localWsq && otherQueue.CanSteal)
{
- break;
+ callback = otherQueue.TrySteal(ref missedSteal);
+ if (callback != null)
+ {
+ break;
+ }
}
+ c--;
+ }
+ if ((callback == null) && !missedSteal)
+ {
+ // Only decrement if the value is unchanged since we started looking for work
+ // This prevents multiple threads decrementing based on overlapping scans.
+ //
+ // When we decrement from active, the producer may have inserted a queue item during our scan
+ // therefore we cannot transition to empty
+ //
+ // When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan,
+ // the producer must write Active. We may transition to empty briefly if we beat the
+ // producer's write, but the producer will then overwrite us before waking threads.
+ // So effectively we cannot mark the queue empty when an item is in the queue.
+ Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved);
}
- c--;
}
}
+ else
+ {
+ // We only need to look at the global queue since WorkStealingQueueList is inactive
+ workItems.TryDequeue(out callback);
+ }
return callback;
}