1 // ***********************************************************************
2 // Copyright (c) 2012 Charlie Poole
4 // Permission is hereby granted, free of charge, to any person obtaining
5 // a copy of this software and associated documentation files (the
6 // "Software"), to deal in the Software without restriction, including
7 // without limitation the rights to use, copy, modify, merge, publish,
8 // distribute, sublicense, and/or sell copies of the Software, and to
9 // permit persons to whom the Software is furnished to do so, subject to
10 // the following conditions:
12 // The above copyright notice and this permission notice shall be
13 // included in all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
19 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
20 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 // ***********************************************************************
25 #define NUNIT_FRAMEWORK
31 using System.Collections.Concurrent;
32 using System.Collections.Generic;
33 using System.Threading;
34 #if NET_2_0 || NET_3_5 || NETCF
35 using ManualResetEventSlim = System.Threading.ManualResetEvent;
38 namespace NUnit.Framework.Internal.Execution
41 /// WorkItemQueueState indicates the current state of a WorkItemQueue
43 public enum WorkItemQueueState
46 /// The queue is paused
51 /// The queue is running
56 /// The queue is stopped
62 /// A WorkItemQueue holds work items that are ready to
63 /// be run, either initially or after some dependency
64 /// has been satisfied.
66 public class WorkItemQueue
68 private const int spinCount = 5;
70 private Logger log = InternalTrace.GetLogger("WorkItemQueue");
72 private readonly ConcurrentQueue<WorkItem> _innerQueue = new ConcurrentQueue<WorkItem>();
74 /* This event is used solely for the purpose of having an optimized sleep cycle when
75 * we have to wait on an external event (Add or Remove for instance)
77 private readonly ManualResetEventSlim _mreAdd = new ManualResetEventSlim(false);
79 /* The whole idea is to use these two values in a transactional
80 * way to track and manage the actual data inside the underlying lock-free collection
81 * instead of directly working with it or using external locking.
83 * They are manipulated with CAS and are guaranteed to increase over time and use
84 * of the instance thus preventing ABA problems.
86 private int _addId = int.MinValue;
87 private int _removeId = int.MinValue;
90 /// Initializes a new instance of the <see cref="WorkItemQueue"/> class.
92 /// <param name="name">The name of the queue.</param>
93 public WorkItemQueue(string name)
96 State = WorkItemQueueState.Paused;
104 /// Gets the name of the work item queue.
106 public string Name { get; private set; }
108 private int _itemsProcessed;
110 /// Gets the total number of items processed so far
112 public int ItemsProcessed
114 get { return _itemsProcessed; }
115 private set { _itemsProcessed = value; }
118 private int _maxCount;
121 /// Gets the maximum number of work items.
125 get { return _maxCount; }
126 private set { _maxCount = value; }
131 /// Gets the current state of the queue
133 public WorkItemQueueState State
135 get { return (WorkItemQueueState)_state; }
136 private set { _state = (int)value; }
140 /// Get a bool indicating whether the queue is empty.
144 get { return _innerQueue.IsEmpty; }
149 #region Public Methods
152 /// Enqueue a WorkItem to be processed
154 /// <param name="work">The WorkItem to process</param>
155 public void Enqueue(WorkItem work)
159 int cachedAddId = _addId;
161 // Validate that we have are the current enqueuer
162 if (Interlocked.CompareExchange(ref _addId, cachedAddId + 1, cachedAddId) != cachedAddId)
165 // Add to the collection
166 _innerQueue.Enqueue(work);
168 // Set MaxCount using CAS
169 int i, j = _maxCount;
173 j = Interlocked.CompareExchange(ref _maxCount, Math.Max(i, _innerQueue.Count), i);
177 // Wake up threads that may have been sleeping
185 /// Dequeue a WorkItem for processing
187 /// <returns>A WorkItem or null if the queue has stopped</returns>
188 public WorkItem Dequeue()
190 SpinWait sw = new SpinWait();
194 WorkItemQueueState cachedState = State;
196 if (cachedState == WorkItemQueueState.Stopped)
197 return null; // Tell worker to terminate
199 int cachedRemoveId = _removeId;
200 int cachedAddId = _addId;
202 // Empty case (or paused)
203 if (cachedRemoveId == cachedAddId || cachedState == WorkItemQueueState.Paused)
205 // Spin a few times to see if something changes
206 if (sw.Count <= spinCount)
212 // Reset to wait for an enqueue
215 // Recheck for an enqueue to avoid a Wait
216 if ((cachedRemoveId != _removeId || cachedAddId != _addId) && cachedState != WorkItemQueueState.Paused)
218 // Queue is not empty, set the event
223 // Wait for something to happen
230 // Validate that we are the current dequeuer
231 if (Interlocked.CompareExchange(ref _removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
235 // Dequeue our work item
237 while (!_innerQueue.TryDequeue(out work)) { };
239 // Add to items processed using CAS
240 Interlocked.Increment(ref _itemsProcessed);
247 /// Start or restart processing of items from the queue
251 log.Info("{0} starting", Name);
253 if (Interlocked.CompareExchange(ref _state, (int)WorkItemQueueState.Running, (int)WorkItemQueueState.Paused) == (int)WorkItemQueueState.Paused)
258 /// Signal the queue to stop
262 log.Info("{0} stopping - {1} WorkItems processed, max size {2}", Name, ItemsProcessed, MaxCount);
264 if (Interlocked.Exchange(ref _state, (int)WorkItemQueueState.Stopped) != (int)WorkItemQueueState.Stopped)
269 /// Pause the queue for restarting later
273 log.Info("{0} pausing", Name);
275 Interlocked.CompareExchange(ref _state, (int)WorkItemQueueState.Paused, (int)WorkItemQueueState.Running);
281 #if NET_2_0 || NET_3_5 || NETCF
282 internal static class ManualResetEventExtensions
284 public static bool Wait (this ManualResetEvent mre, int millisecondsTimeout)
286 return mre.WaitOne(millisecondsTimeout, false);