From 0b73485685c61b0e46585c93dd7cd5355c137b5f Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Tue, 31 Jan 2017 11:41:36 -0500 Subject: [PATCH] Port ConcurrentQueue back from corefx It's not exposed from CoreLib, but a) as long as the code is here it's good to have it in sync, and b) we can use it in ThreadPool rather than having a separate concurrent queue implementation (which is very similar to the old ConcurrentQueue design). Commit migrated from https://github.com/dotnet/coreclr/commit/9cd40ee1ac4705de5542acaf8210c382b5c18087 --- .../Collections/Concurrent/ConcurrentQueue.cs | 1461 +++++++++++--------- 1 file changed, 825 insertions(+), 636 deletions(-) diff --git a/src/coreclr/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs b/src/coreclr/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs index 7aa5971..5828369 100644 --- a/src/coreclr/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs +++ b/src/coreclr/src/mscorlib/src/System/Collections/Concurrent/ConcurrentQueue.cs @@ -1,67 +1,100 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -#pragma warning disable 0420 - -// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ -// -// -// -// A lock-free, concurrent queue primitive, and its associated debugger view type. -// -// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- - -using System; -using System.Collections; using System.Collections.Generic; using System.Diagnostics; -using System.Diagnostics.Contracts; -using System.Runtime.ConstrainedExecution; using System.Runtime.InteropServices; using System.Runtime.Serialization; -using System.Security; -using System.Security.Permissions; using System.Threading; namespace System.Collections.Concurrent { - /// /// Represents a thread-safe first-in, first-out collection of objects. /// /// Specifies the type of elements in the queue. /// - /// All public and protected members of are thread-safe and may be used + /// All public and protected members of are thread-safe and may be used /// concurrently from multiple threads. /// - [ComVisible(false)] [DebuggerDisplay("Count = {Count}")] [DebuggerTypeProxy(typeof(SystemCollectionsConcurrent_ProducerConsumerCollectionDebugView<>))] [Serializable] public class ConcurrentQueue : IProducerConsumerCollection, IReadOnlyCollection { - //fields of ConcurrentQueue - [NonSerialized] - private volatile Segment m_head; + // This implementation provides an unbounded, multi-producer multi-consumer queue + // that supports the standard Enqueue/TryDequeue operations, as well as support for + // snapshot enumeration (GetEnumerator, ToArray, CopyTo), peeking, and Count/IsEmpty. + // It is composed of a linked list of bounded ring buffers, each of which has a head + // and a tail index, isolated from each other to minimize false sharing. As long as + // the number of elements in the queue remains less than the size of the current + // buffer (Segment), no additional allocations are required for enqueued items. When + // the number of items exceeds the size of the current segment, the current segment is + // "frozen" to prevent further enqueues, and a new segment is linked from it and set + // as the new tail segment for subsequent enqueues. As old segments are consumed by + // dequeues, the head reference is updated to point to the segment that dequeuers should + // try next. To support snapshot enumeration, segments also support the notion of + // preserving for observation, whereby they avoid overwriting state as part of dequeues. + // Any operation that requires a snapshot results in all current segments being + // both frozen for enqueues and preserved for observation: any new enqueues will go + // to new segments, and dequeuers will consume from the existing segments but without + // overwriting the existing data. + + /// Initial length of the segments used in the queue. + private const int InitialSegmentLength = 32; + /// + /// Maximum length of the segments used in the queue. This is a somewhat arbitrary limit: + /// larger means that as long as we don't exceed the size, we avoid allocating more segments, + /// but if we do exceed it, then the segment becomes garbage. + /// + private const int MaxSegmentLength = 1024 * 1024; + /// + /// Lock used to protect cross-segment operations, including any updates to or + /// and any operations that need to get a consistent view of them. + /// [NonSerialized] - private volatile Segment m_tail; - - private T[] m_serializationArray; // Used for custom serialization. - - private const int SEGMENT_SIZE = 32; - - //number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot. + private object _crossSegmentLock; + /// The current tail segment. [NonSerialized] - internal volatile int m_numSnapshotTakers = 0; + private volatile Segment _tail; + /// The current head segment. + [NonSerialized] + private volatile Segment _head; + /// Field used to temporarily store the contents of the queue for serialization. + private T[] _serializationArray; /// /// Initializes a new instance of the class. /// public ConcurrentQueue() { - m_head = m_tail = new Segment(0, this); + _crossSegmentLock = new object(); + _tail = _head = new Segment(InitialSegmentLength); + } + + /// Set the data array to be serialized. + [OnSerializing] + private void OnSerializing(StreamingContext context) + { + _serializationArray = ToArray(); + } + + /// Clear the data array that was serialized. + [OnSerialized] + private void OnSerialized(StreamingContext context) + { + _serializationArray = null; + } + + /// Construct the queue from the deserialized . + [OnDeserialized] + private void OnDeserialized(StreamingContext context) + { + Debug.Assert(_serializationArray != null); + InitializeFromCollection(_serializationArray); + _serializationArray = null; } /// @@ -70,34 +103,39 @@ namespace System.Collections.Concurrent /// A collection from which to copy elements. private void InitializeFromCollection(IEnumerable collection) { - Segment localTail = new Segment(0, this);//use this local variable to avoid the extra volatile read/write. this is safe because it is only called from ctor - m_head = localTail; - - int index = 0; - foreach (T element in collection) + _crossSegmentLock = new object(); + + // Determine the initial segment size. We'll use the default, + // unless the collection is known to be larger than than, in which + // case we round its length up to a power of 2, as all segments must + // be a power of 2 in length. + int length = InitialSegmentLength; + var c = collection as ICollection; + if (c != null) { - Debug.Assert(index >= 0 && index < SEGMENT_SIZE); - localTail.UnsafeAdd(element); - index++; - - if (index >= SEGMENT_SIZE) + int count = c.Count; + if (count > length) { - localTail = localTail.UnsafeGrow(); - index = 0; + length = RoundUpToPowerOf2(count); } } - m_tail = localTail; + // Initialize the segment and add all of the data to it. + _tail = _head = new Segment(length); + foreach (T item in collection) + { + Enqueue(item); + } } /// - /// Initializes a new instance of the - /// class that contains elements copied from the specified collection + /// Initializes a new instance of the class that contains elements copied + /// from the specified collection. /// - /// The collection whose elements are copied to the new . - /// The argument is - /// null. + /// + /// The collection whose elements are copied to the new . + /// + /// The argument is null. public ConcurrentQueue(IEnumerable collection) { if (collection == null) @@ -109,37 +147,15 @@ namespace System.Collections.Concurrent } /// - /// Get the data array to be serialized + /// Copies the elements of the to an , starting at a particular index. /// - [OnSerializing] - private void OnSerializing(StreamingContext context) - { - // save the data into the serialization array to be saved - m_serializationArray = ToArray(); - } - - /// - /// Construct the queue from a previously seiralized one - /// - [OnDeserialized] - private void OnDeserialized(StreamingContext context) - { - Debug.Assert(m_serializationArray != null); - InitializeFromCollection(m_serializationArray); - m_serializationArray = null; - } - - /// - /// Copies the elements of the to an , starting at a particular - /// index. - /// - /// The one-dimensional Array that is the - /// destination of the elements copied from the - /// . The Array must have zero-based indexing. - /// The zero-based index in at which copying - /// begins. + /// + /// The one-dimensional Array that is the destination of the + /// elements copied from the . must have + /// zero-based indexing. + /// + /// The zero-based index in at which copying begins. /// is a null reference (Nothing in /// Visual Basic). /// is less than @@ -148,72 +164,58 @@ namespace System.Collections.Concurrent /// is multidimensional. -or- /// does not have zero-based indexing. -or- /// is equal to or greater than the length of the - /// -or- The number of elements in the source is + /// -or- The number of elements in the source is /// greater than the available space from to the end of the destination /// . -or- The type of the source cannot be cast automatically to the type of the + /// cref="ICollection"/> cannot be cast automatically to the type of the /// destination . /// void ICollection.CopyTo(Array array, int index) { + // Special-case when the Array is actually a T[], taking a faster path + T[] szArray = array as T[]; + if (szArray != null) + { + CopyTo(szArray, index); + return; + } + // Validate arguments. if (array == null) { throw new ArgumentNullException(nameof(array)); } - // We must be careful not to corrupt the array, so we will first accumulate an - // internal list of elements that we will then copy to the array. This requires - // some extra allocation, but is necessary since we don't know up front whether - // the array is sufficiently large to hold the stack's contents. - ((ICollection)ToList()).CopyTo(array, index); + // Otherwise, fall back to the slower path that first copies the contents + // to an array, and then uses that array's non-generic CopyTo to do the copy. + ToArray().CopyTo(array, index); } /// - /// Gets a value indicating whether access to the is + /// Gets a value indicating whether access to the is /// synchronized with the SyncRoot. /// - /// true if access to the is synchronized + /// true if access to the is synchronized /// with the SyncRoot; otherwise, false. For , this property always /// returns false. - bool ICollection.IsSynchronized - { - // Gets a value indicating whether access to this collection is synchronized. Always returns - // false. The reason is subtle. While access is in face thread safe, it's not the case that - // locking on the SyncRoot would have prevented concurrent pushes and pops, as this property - // would typically indicate; that's because we internally use CAS operations vs. true locks. - get { return false; } - } - + bool ICollection.IsSynchronized => false; // always false, as true implies synchronization via SyncRoot /// /// Gets an object that can be used to synchronize access to the . This property is not supported. + /// cref="ICollection"/>. This property is not supported. /// - /// The SyncRoot property is not supported. - object ICollection.SyncRoot - { - get - { - throw new NotSupportedException(Environment.GetResourceString("ConcurrentCollection_SyncRoot_NotSupported")); - } - } + /// The SyncRoot property is not supported. + object ICollection.SyncRoot { get { throw new NotSupportedException(Environment.GetResourceString("ConcurrentCollection_SyncRoot_NotSupported")); } } - /// - /// Returns an enumerator that iterates through a collection. - /// - /// An that can be used to iterate through the collection. - IEnumerator IEnumerable.GetEnumerator() - { - return ((IEnumerable)this).GetEnumerator(); - } + /// Returns an enumerator that iterates through a collection. + /// An that can be used to iterate through the collection. + IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)this).GetEnumerator(); /// - /// Attempts to add an object to the . + /// Attempts to add an object to the . /// /// The object to add to the . The value can be a null + /// cref="Concurrent.IProducerConsumerCollection{T}"/>. The value can be a null /// reference (Nothing in Visual Basic) for reference types. /// /// true if the object was added successfully; otherwise, false. @@ -227,21 +229,17 @@ namespace System.Collections.Concurrent } /// - /// Attempts to remove and return an object from the . + /// Attempts to remove and return an object from the . /// /// /// When this method returns, if the operation was successful, contains the /// object removed. If no object was available to be removed, the value is unspecified. /// - /// true if an element was removed and returned succesfully; otherwise, false. + /// true if an element was removed and returned successfully; otherwise, false. /// For , this operation will attempt to remove the object /// from the beginning of the . /// - bool IProducerConsumerCollection.TryTake(out T item) - { - return TryDequeue(out item); - } + bool IProducerConsumerCollection.TryTake(out T item) => TryDequeue(out item); /// /// Gets a value that indicates whether the is empty. @@ -258,126 +256,43 @@ namespace System.Collections.Concurrent { get { - Segment head = m_head; - if (!head.IsEmpty) - //fast route 1: - //if current head is not empty, then queue is not empty - return false; - else if (head.Next == null) - //fast route 2: - //if current head is empty and it's the last segment - //then queue is empty - return true; - else - //slow route: - //current head is empty and it is NOT the last segment, - //it means another thread is growing new segment - { - SpinWait spin = new SpinWait(); - while (head.IsEmpty) - { - if (head.Next == null) - return true; - - spin.SpinOnce(); - head = m_head; - } - return false; - } + // IsEmpty == !TryPeek. We use a "resultUsed:false" peek in order to avoid marking + // segments as preserved for observation, making IsEmpty a cheaper way than either + // TryPeek(out T) or Count == 0 to check whether any elements are in the queue. + T ignoredResult; + return !TryPeek(out ignoredResult, resultUsed: false); } } - /// - /// Copies the elements stored in the to a new array. - /// - /// A new array containing a snapshot of elements copied from the . + /// Copies the elements stored in the to a new array. + /// A new array containing a snapshot of elements copied from the . public T[] ToArray() { - return ToList().ToArray(); - } + // Snap the current contents for enumeration. + Segment head, tail; + int headHead, tailTail; + SnapForObservation(out head, out headHead, out tail, out tailTail); - /// - /// Copies the elements to a new . - /// - /// A new containing a snapshot of - /// elements copied from the . - private List ToList() - { - // Increments the number of active snapshot takers. This increment must happen before the snapshot is - // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it - // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. - Interlocked.Increment(ref m_numSnapshotTakers); + // Count the number of items in that snapped set, and use it to allocate an + // array of the right size. + long count = GetCount(head, headHead, tail, tailTail); + T[] arr = new T[count]; - List list = new List(); - try + // Now enumerate the contents, copying each element into the array. + using (IEnumerator e = Enumerate(head, headHead, tail, tailTail)) { - //store head and tail positions in buffer, - Segment head, tail; - int headLow, tailHigh; - GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); - - if (head == tail) + int i = 0; + while (e.MoveNext()) { - head.AddToList(list, headLow, tailHigh); - } - else - { - head.AddToList(list, headLow, SEGMENT_SIZE - 1); - Segment curr = head.Next; - while (curr != tail) - { - curr.AddToList(list, 0, SEGMENT_SIZE - 1); - curr = curr.Next; - } - //Add tail segment - tail.AddToList(list, 0, tailHigh); + arr[i++] = e.Current; } + Debug.Assert(count == i); } - finally - { - // This Decrement must happen after copying is over. - Interlocked.Decrement(ref m_numSnapshotTakers); - } - return list; - } - /// - /// Store the position of the current head and tail positions. - /// - /// return the head segment - /// return the tail segment - /// return the head offset, value range [0, SEGMENT_SIZE] - /// return the tail offset, value range [-1, SEGMENT_SIZE-1] - private void GetHeadTailPositions(out Segment head, out Segment tail, - out int headLow, out int tailHigh) - { - head = m_head; - tail = m_tail; - headLow = head.Low; - tailHigh = tail.High; - SpinWait spin = new SpinWait(); - - //we loop until the observed values are stable and sensible. - //This ensures that any update order by other methods can be tolerated. - while ( - //if head and tail changed, retry - head != m_head || tail != m_tail - //if low and high pointers, retry - || headLow != head.Low || tailHigh != tail.High - //if head jumps ahead of tail because of concurrent grow and dequeue, retry - || head.m_index > tail.m_index) - { - spin.SpinOnce(); - head = m_head; - tail = m_tail; - headLow = head.Low; - tailHigh = tail.High; - } + // And return it. + return arr; } - /// /// Gets the number of elements contained in the . /// @@ -391,38 +306,140 @@ namespace System.Collections.Concurrent { get { - //store head and tail positions in buffer, Segment head, tail; - int headLow, tailHigh; - GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); - - if (head == tail) + int headHead, headTail, tailHead, tailTail; + var spinner = new SpinWait(); + while (true) { - return tailHigh - headLow + 1; + // Capture the head and tail, as well as the head's head and tail. + head = _head; + tail = _tail; + headHead = Volatile.Read(ref head._headAndTail.Head); + headTail = Volatile.Read(ref head._headAndTail.Tail); + + if (head == tail) + { + // There was a single segment in the queue. If the captured + // values still (or again) represent reality, return the segment's + // count. A single segment should be the most common case once the + // queue's size has stabilized after segments have grown to + // the point where growing is no longer needed. + if (head == _head && + head == _tail && + headHead == Volatile.Read(ref head._headAndTail.Head) && + headTail == Volatile.Read(ref head._headAndTail.Tail)) + { + return GetCount(head, headHead, headTail); + } + } + else if (head._nextSegment == tail) + { + // There were two segments in the queue. Get the positions + // from the tail, and if the captured values still (or again) match + // reality, return the sum of the counts from both segments. + tailHead = Volatile.Read(ref tail._headAndTail.Head); + tailTail = Volatile.Read(ref tail._headAndTail.Tail); + if (head == _head && + tail == _tail && + headHead == Volatile.Read(ref head._headAndTail.Head) && + headTail == Volatile.Read(ref head._headAndTail.Tail) && + tailHead == Volatile.Read(ref tail._headAndTail.Head) && + tailTail == Volatile.Read(ref tail._headAndTail.Tail)) + { + // We got stable values, so we can just compute the sizes based on those + // values and return the sum of the counts of the segments. + return GetCount(head, headHead, headTail) + GetCount(tail, tailHead, tailTail); + } + } + else + { + // There were more than two segments. Take the slower path, where we freeze the + // queue and then count the now stable segments. + SnapForObservation(out head, out headHead, out tail, out tailTail); + return unchecked((int)GetCount(head, headHead, tail, tailTail)); + } + + // We raced with enqueues/dequeues and captured an inconsistent picture of the queue. + // Spin and try again. + spinner.SpinOnce(); } + } + } - //head segment - int count = SEGMENT_SIZE - headLow; + /// Computes the number of items in a segment based on a fixed head and tail in that segment. + private static int GetCount(Segment s, int head, int tail) + { + if (head != tail && head != tail - s.FreezeOffset) + { + head &= s._slotsMask; + tail &= s._slotsMask; + return head < tail ? tail - head : s._slots.Length - head + tail; + } + return 0; + } - //middle segment(s), if any, are full. - //We don't deal with overflow to be consistent with the behavior of generic types in CLR. - count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1)); + /// Gets the number of items in snapped region. + private static long GetCount(Segment head, int headHead, Segment tail, int tailTail) + { + // All of the segments should have been both frozen for enqueues and preserved for observation. + // Validate that here for head and tail; we'll validate it for intermediate segments later. + Debug.Assert(head._preservedForObservation); + Debug.Assert(head._frozenForEnqueues); + Debug.Assert(tail._preservedForObservation); + Debug.Assert(tail._frozenForEnqueues); + + long count = 0; + + // Head segment. We've already marked it as frozen for enqueues, so its tail position is fixed, + // and we've already marked it as preserved for observation (before we grabbed the head), so we + // can safely enumerate from its head to its tail and access its elements. + int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset; + if (headHead < headTail) + { + // Mask the head and tail for the head segment + headHead &= head._slotsMask; + headTail &= head._slotsMask; + + // Increase the count by either the one or two regions, based on whether tail + // has wrapped to be less than head. + count += headHead < headTail ? + headTail - headHead : + head._slots.Length - headHead + headTail; + } - //tail segment - count += tailHigh + 1; + // We've enumerated the head. If the tail is different from the head, we need to + // enumerate the remaining segments. + if (head != tail) + { + // Count the contents of each segment between head and tail, not including head and tail. + // Since there were segments before these, for our purposes we consider them to start at + // the 0th element, and since there is at least one segment after each, each was frozen + // by the time we snapped it, so we can iterate until each's frozen tail. + for (Segment s = head._nextSegment; s != tail; s = s._nextSegment) + { + Debug.Assert(s._preservedForObservation); + Debug.Assert(s._frozenForEnqueues); + count += s._headAndTail.Tail - s.FreezeOffset; + } - return count; + // Finally, enumerate the tail. As with the intermediate segments, there were segments + // before this in the snapped region, so we can start counting from the beginning. Unlike + // the intermediate segments, we can't just go until the Tail, as that could still be changing; + // instead we need to go until the tail we snapped for observation. + count += tailTail - tail.FreezeOffset; } - } + // Return the computed count. + return count; + } /// /// Copies the elements to an existing one-dimensional Array, starting at the specified array index. + /// cref="Array">Array, starting at the specified array index. /// - /// The one-dimensional Array that is the + /// The one-dimensional Array that is the /// destination of the elements copied from the - /// . The Array must have zero-based + /// . The Array must have zero-based /// indexing. /// The zero-based index in at which copying /// begins. @@ -442,19 +459,36 @@ namespace System.Collections.Concurrent { throw new ArgumentNullException(nameof(array)); } + if (index < 0) + { + throw new ArgumentOutOfRangeException(nameof(index)); + } - // We must be careful not to corrupt the array, so we will first accumulate an - // internal list of elements that we will then copy to the array. This requires - // some extra allocation, but is necessary since we don't know up front whether - // the array is sufficiently large to hold the stack's contents. - ToList().CopyTo(array, index); - } + // Snap for enumeration + Segment head, tail; + int headHead, tailTail; + SnapForObservation(out head, out headHead, out tail, out tailTail); + // Get the number of items to be enumerated + long count = GetCount(head, headHead, tail, tailTail); + if (index > array.Length - count) + { + throw new ArgumentException(); // TODO: finish this + } - /// - /// Returns an enumerator that iterates through the . - /// + // Copy the items to the target array + int i = index; + using (IEnumerator e = Enumerate(head, headHead, tail, tailTail)) + { + while (e.MoveNext()) + { + array[i++] = e.Current; + } + } + Debug.Assert(count == i - index); + } + + /// Returns an enumerator that iterates through the . /// An enumerator for the contents of the . /// @@ -465,124 +499,195 @@ namespace System.Collections.Concurrent /// public IEnumerator GetEnumerator() { - // Increments the number of active snapshot takers. This increment must happen before the snapshot is - // taken. At the same time, Decrement must happen after the enumeration is over. Only in this way, can it - // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. - Interlocked.Increment(ref m_numSnapshotTakers); - - // Takes a snapshot of the queue. - // A design flaw here: if a Thread.Abort() happens, we cannot decrement m_numSnapshotTakers. But we cannot - // wrap the following with a try/finally block, otherwise the decrement will happen before the yield return - // statements in the GetEnumerator (head, tail, headLow, tailHigh) method. Segment head, tail; - int headLow, tailHigh; - GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); - - //If we put yield-return here, the iterator will be lazily evaluated. As a result a snapshot of - // the queue is not taken when GetEnumerator is initialized but when MoveNext() is first called. - // This is inconsistent with existing generic collections. In order to prevent it, we capture the - // value of m_head in a buffer and call out to a helper method. - //The old way of doing this was to return the ToList().GetEnumerator(), but ToList() was an - // unnecessary perfomance hit. - return GetEnumerator(head, tail, headLow, tailHigh); + int headHead, tailTail; + SnapForObservation(out head, out headHead, out tail, out tailTail); + return Enumerate(head, headHead, tail, tailTail); } /// - /// Helper method of GetEnumerator to seperate out yield return statement, and prevent lazy evaluation. + /// Gets the head and tail information of the current contents of the queue. + /// After this call returns, the specified region can be enumerated any number + /// of times and will not change. /// - private IEnumerator GetEnumerator(Segment head, Segment tail, int headLow, int tailHigh) + private void SnapForObservation(out Segment head, out int headHead, out Segment tail, out int tailTail) + { + lock (_crossSegmentLock) // _head and _tail may only change while the lock is held. + { + // Snap the head and tail + head = _head; + tail = _tail; + Debug.Assert(head != null); + Debug.Assert(tail != null); + Debug.Assert(tail._nextSegment == null); + + // Mark them and all segments in between as preserving, and ensure no additional items + // can be added to the tail. + for (Segment s = head; ; s = s._nextSegment) + { + s._preservedForObservation = true; + if (s == tail) break; + Debug.Assert(s._frozenForEnqueues); // any non-tail should already be marked + } + tail.EnsureFrozenForEnqueues(); // we want to prevent the tailTail from moving + + // At this point, any dequeues from any segment won't overwrite the value, and + // none of the existing segments can have new items enqueued. + + headHead = Volatile.Read(ref head._headAndTail.Head); + tailTail = Volatile.Read(ref tail._headAndTail.Tail); + } + } + + /// Gets the item stored in the th entry in . + private T GetItemWhenAvailable(Segment segment, int i) { - try + Debug.Assert(segment._preservedForObservation); + + // Get the expected value for the sequence number + int expectedSequenceNumberAndMask = (i + 1) & segment._slotsMask; + + // If the expected sequence number is not yet written, we're still waiting for + // an enqueuer to finish storing it. Spin until it's there. + if ((segment._slots[i].SequenceNumber & segment._slotsMask) != expectedSequenceNumberAndMask) { - SpinWait spin = new SpinWait(); + var spinner = new SpinWait(); + while ((Volatile.Read(ref segment._slots[i].SequenceNumber) & segment._slotsMask) != expectedSequenceNumberAndMask) + { + spinner.SpinOnce(); + } + } - if (head == tail) + // Return the value from the slot. + return segment._slots[i].Item; + } + + private IEnumerator Enumerate(Segment head, int headHead, Segment tail, int tailTail) + { + Debug.Assert(head._preservedForObservation); + Debug.Assert(head._frozenForEnqueues); + Debug.Assert(tail._preservedForObservation); + Debug.Assert(tail._frozenForEnqueues); + + // Head segment. We've already marked it as not accepting any more enqueues, + // so its tail position is fixed, and we've already marked it as preserved for + // enumeration (before we grabbed its head), so we can safely enumerate from + // its head to its tail. + int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset; + if (headHead < headTail) + { + headHead &= head._slotsMask; + headTail &= head._slotsMask; + + if (headHead < headTail) { - for (int i = headLow; i <= tailHigh; i++) - { - // If the position is reserved by an Enqueue operation, but the value is not written into, - // spin until the value is available. - spin.Reset(); - while (!head.m_state[i].m_value) - { - spin.SpinOnce(); - } - yield return head.m_array[i]; - } + for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i); } else { - //iterate on head segment - for (int i = headLow; i < SEGMENT_SIZE; i++) - { - // If the position is reserved by an Enqueue operation, but the value is not written into, - // spin until the value is available. - spin.Reset(); - while (!head.m_state[i].m_value) - { - spin.SpinOnce(); - } - yield return head.m_array[i]; - } - //iterate on middle segments - Segment curr = head.Next; - while (curr != tail) - { - for (int i = 0; i < SEGMENT_SIZE; i++) - { - // If the position is reserved by an Enqueue operation, but the value is not written into, - // spin until the value is available. - spin.Reset(); - while (!curr.m_state[i].m_value) - { - spin.SpinOnce(); - } - yield return curr.m_array[i]; - } - curr = curr.Next; - } + for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i); + for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i); + } + } + + // We've enumerated the head. If the tail is the same, we're done. + if (head != tail) + { + // Each segment between head and tail, not including head and tail. Since there were + // segments before these, for our purposes we consider it to start at the 0th element. + for (Segment s = head._nextSegment; s != tail; s = s._nextSegment) + { + Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration"); + Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate"); - //iterate on tail segment - for (int i = 0; i <= tailHigh; i++) + int sTail = s._headAndTail.Tail - s.FreezeOffset; + for (int i = 0; i < sTail; i++) { - // If the position is reserved by an Enqueue operation, but the value is not written into, - // spin until the value is available. - spin.Reset(); - while (!tail.m_state[i].m_value) - { - spin.SpinOnce(); - } - yield return tail.m_array[i]; + yield return GetItemWhenAvailable(s, i); } } - } - finally - { - // This Decrement must happen after the enumeration is over. - Interlocked.Decrement(ref m_numSnapshotTakers); + + // Enumerate the tail. Since there were segments before this, we can just start at + // its beginning, and iterate until the tail we already grabbed. + tailTail -= tail.FreezeOffset; + for (int i = 0; i < tailTail; i++) + { + yield return GetItemWhenAvailable(tail, i); + } } } - /// - /// Adds an object to the end of the . - /// - /// The object to add to the end of the . The value can be a null reference - /// (Nothing in Visual Basic) for reference types. + /// Round the specified value up to the next power of 2, if it isn't one already. + private static int RoundUpToPowerOf2(int i) + { + --i; + i |= i >> 1; + i |= i >> 2; + i |= i >> 4; + i |= i >> 8; + i |= i >> 16; + return i + 1; + } + + /// Adds an object to the end of the . + /// + /// The object to add to the end of the . + /// The value can be a null reference (Nothing in Visual Basic) for reference types. /// public void Enqueue(T item) { - SpinWait spin = new SpinWait(); + // Try to enqueue to the current tail. + if (!_tail.TryEnqueue(item)) + { + // If we're unable to, we need to take a slow path that will + // try to add a new tail segment. + EnqueueSlow(item); + } + } + + /// Adds to the end of the queue, adding a new segment if necessary. + private void EnqueueSlow(T item) + { while (true) { - Segment tail = m_tail; - if (tail.TryAppend(item)) + Segment tail = _tail; + + // Try to append to the existing tail. + if (tail.TryEnqueue(item)) + { return; - spin.SpinOnce(); + } + + // If we were unsuccessful, take the lock so that we can compare and manipulate + // the tail. Assuming another enqueuer hasn't already added a new segment, + // do so, then loop around to try enqueueing again. + lock (_crossSegmentLock) + { + if (tail == _tail) + { + // Make sure no one else can enqueue to this segment. + tail.EnsureFrozenForEnqueues(); + + // We determine the new segment's length based on the old length. + // In general, we double the size of the segment, to make it less likely + // that we'll need to grow again. However, if the tail segment is marked + // as preserved for observation, something caused us to avoid reusing this + // segment, and if that happens a lot and we grow, we'll end up allocating + // lots of wasted space. As such, in such situations we reset back to the + // initial segment length; if these observations are happening frequently, + // this will help to avoid wasted memory, and if they're not, we'll + // relatively quickly grow again to a larger size. + int nextSize = tail._preservedForObservation ? InitialSegmentLength : tail.Capacity * 2; + var newTail = new Segment(nextSize); + + // Hook up the new tail. + tail._nextSegment = newTail; + _tail = newTail; + } + } } } - /// /// Attempts to remove and return the object at the beginning of the . @@ -591,369 +696,453 @@ namespace System.Collections.Concurrent /// When this method returns, if the operation was successful, contains the /// object removed. If no object was available to be removed, the value is unspecified. /// - /// true if an element was removed and returned from the beggining of the - /// succesfully; otherwise, false. - public bool TryDequeue(out T result) + /// + /// true if an element was removed and returned from the beginning of the + /// successfully; otherwise, false. + /// + public bool TryDequeue(out T result) => + _head.TryDequeue(out result) || // fast-path that operates just on the head segment + TryDequeueSlow(out result); // slow path that needs to fix up segments + + /// Tries to dequeue an item, removing empty segments as needed. + private bool TryDequeueSlow(out T item) { - while (!IsEmpty) + while (true) { - Segment head = m_head; - if (head.TryRemove(out result)) + // Get the current head + Segment head = _head; + + // Try to take. If we're successful, we're done. + if (head.TryDequeue(out item)) + { + return true; + } + + // Check to see whether this segment is the last. If it is, we can consider + // this to be a moment-in-time empty condition (even though between the TryDequeue + // check and this check, another item could have arrived). + if (head._nextSegment == null) + { + item = default(T); + return false; + } + + // At this point we know that head.Next != null, which means + // this segment has been frozen for additional enqueues. But between + // the time that we ran TryDequeue and checked for a next segment, + // another item could have been added. Try to dequeue one more time + // to confirm that the segment is indeed empty. + Debug.Assert(head._frozenForEnqueues); + if (head.TryDequeue(out item)) + { return true; - //since method IsEmpty spins, we don't need to spin in the while loop + } + + // This segment is frozen (nothing more can be added) and empty (nothing is in it). + // Update head to point to the next segment in the list, assuming no one's beat us to it. + lock (_crossSegmentLock) + { + if (head == _head) + { + _head = head._nextSegment; + } + } } - result = default(T); - return false; } /// /// Attempts to return an object from the beginning of the /// without removing it. /// - /// When this method returns, contains an object from - /// the beginning of the or an - /// unspecified value if the operation failed. + /// + /// When this method returns, contains an object from + /// the beginning of the or default(T) + /// if the operation failed. + /// /// true if and object was returned successfully; otherwise, false. - public bool TryPeek(out T result) - { - Interlocked.Increment(ref m_numSnapshotTakers); + /// + /// For determining whether the collection contains any items, use of the + /// property is recommended rather than peeking. + /// + public bool TryPeek(out T result) => TryPeek(out result, resultUsed: true); - while (!IsEmpty) + /// Attempts to retrieve the value for the first element in the queue. + /// The value of the first element, if found. + /// true if the result is neede; otherwise false if only the true/false outcome is needed. + /// true if an element was found; otherwise, false. + private bool TryPeek(out T result, bool resultUsed) + { + // Starting with the head segment, look through all of the segments + // for the first one we can find that's not empty. + Segment s = _head; + while (true) { - Segment head = m_head; - if (head.TryPeek(out result)) + // Grab the next segment from this one, before we peek. + // This is to be able to see whether the value has changed + // during the peek operation. + Segment next = Volatile.Read(ref s._nextSegment); + + // Peek at the segment. If we find an element, we're done. + if (s.TryPeek(out result, resultUsed)) { - Interlocked.Decrement(ref m_numSnapshotTakers); return true; } - //since method IsEmpty spins, we don't need to spin in the while loop + + // The current segment was empty at the moment we checked. + + if (next != null) + { + // If prior to the peek there was already a next segment, then + // during the peek no additional items could have been enqueued + // to it and we can just move on to check the next segment. + Debug.Assert(next == s._nextSegment); + s = next; + } + else if (Volatile.Read(ref s._nextSegment) == null) + { + // The next segment is null. Nothing more to peek at. + break; + } + + // The next segment was null before we peeked but non-null after. + // That means either when we peeked the first segment had + // already been frozen but the new segment not yet added, + // or that the first segment was empty and between the time + // that we peeked and then checked _nextSegment, so many items + // were enqueued that we filled the first segment and went + // into the next. Since we need to peek in order, we simply + // loop around again to peek on the same segment. The next + // time around on this segment we'll then either successfully + // peek or we'll find that next was non-null before peeking, + // and we'll traverse to that segment. } + result = default(T); - Interlocked.Decrement(ref m_numSnapshotTakers); return false; } - /// - /// private class for ConcurrentQueue. - /// a queue is a linked list of small arrays, each node is called a segment. - /// A segment contains an array, a pointer to the next segment, and m_low, m_high indices recording - /// the first and last valid elements of the array. + /// Removes all objects from the . /// - private class Segment + public void Clear() { - //we define two volatile arrays: m_array and m_state. Note that the accesses to the array items - //do not get volatile treatment. But we don't need to worry about loading adjacent elements or - //store/load on adjacent elements would suffer reordering. - // - Two stores: these are at risk, but CLRv2 memory model guarantees store-release hence we are safe. - // - Two loads: because one item from two volatile arrays are accessed, the loads of the array references - // are sufficient to prevent reordering of the loads of the elements. - internal volatile T[] m_array; - - // For each entry in m_array, the corresponding entry in m_state indicates whether this position contains - // a valid value. m_state is initially all false. - internal volatile VolatileBool[] m_state; - - //pointer to the next segment. null if the current segment is the last segment - private volatile Segment m_next; - - //We use this zero based index to track how many segments have been created for the queue, and - //to compute how many active segments are there currently. - // * The number of currently active segments is : m_tail.m_index - m_head.m_index + 1; - // * m_index is incremented with every Segment.Grow operation. We use Int64 type, and we can safely - // assume that it never overflows. To overflow, we need to do 2^63 increments, even at a rate of 4 - // billion (2^32) increments per second, it takes 2^31 seconds, which is about 64 years. - internal readonly long m_index; - - //indices of where the first and last valid values - // - m_low points to the position of the next element to pop from this segment, range [0, infinity) - // m_low >= SEGMENT_SIZE implies the segment is disposable - // - m_high points to the position of the latest pushed element, range [-1, infinity) - // m_high == -1 implies the segment is new and empty - // m_high >= SEGMENT_SIZE-1 means this segment is ready to grow. - // and the thread who sets m_high to SEGMENT_SIZE-1 is responsible to grow the segment - // - Math.Min(m_low, SEGMENT_SIZE) > Math.Min(m_high, SEGMENT_SIZE-1) implies segment is empty - // - initially m_low =0 and m_high=-1; - private volatile int m_low; - private volatile int m_high; - - private volatile ConcurrentQueue m_source; - - /// - /// Create and initialize a segment with the specified index. - /// - internal Segment(long index, ConcurrentQueue source) - { - m_array = new T[SEGMENT_SIZE]; - m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false - m_high = -1; - Debug.Assert(index >= 0); - m_index = index; - m_source = source; - } - - /// - /// return the next segment - /// - internal Segment Next - { - get { return m_next; } - } - - - /// - /// return true if the current segment is empty (doesn't have any element available to dequeue, - /// false otherwise - /// - internal bool IsEmpty + lock (_crossSegmentLock) { - get { return (Low > High); } - } - - /// - /// Add an element to the tail of the current segment - /// exclusively called by ConcurrentQueue.InitializedFromCollection - /// InitializeFromCollection is responsible to guaratee that there is no index overflow, - /// and there is no contention - /// - /// - internal void UnsafeAdd(T value) - { - Debug.Assert(m_high < SEGMENT_SIZE - 1); - m_high++; - m_array[m_high] = value; - m_state[m_high].m_value = true; + // Simply substitute a new segment for the existing head/tail, + // as is done in the constructor. Operations currently in flight + // may still read from or write to an existing segment that's + // getting dropped, meaning that in flight operations may not be + // linear with regards to this clear operation. To help mitigate + // in-flight operations enqueuing onto the tail that's about to + // be dropped, we first freeze it; that'll force enqueuers to take + // this lock to synchronize and see the new tail. + _tail.EnsureFrozenForEnqueues(); + _tail = _head = new Segment(InitialSegmentLength); } + } - /// - /// Create a new segment and append to the current one - /// Does not update the m_tail pointer - /// exclusively called by ConcurrentQueue.InitializedFromCollection - /// InitializeFromCollection is responsible to guaratee that there is no index overflow, - /// and there is no contention - /// - /// the reference to the new Segment - internal Segment UnsafeGrow() + /// + /// Provides a multi-producer, multi-consumer thread-safe bounded segment. When the queue is full, + /// enqueues fail and return false. When the queue is empty, dequeues fail and return null. + /// These segments are linked together to form the unbounded . + /// + [DebuggerDisplay("Capacity = {Capacity}")] + private sealed class Segment + { + // Segment design is inspired by the algorithm outlined at: + // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + + /// The array of items in this queue. Each slot contains the item in that slot and its "sequence number". + internal readonly Slot[] _slots; + /// Mask for quickly accessing a position within the queue's array. + internal readonly int _slotsMask; + /// The head and tail positions, with padding to help avoid false sharing contention. + /// Dequeueing happens from the head, enqueueing happens at the tail. + internal PaddedHeadAndTail _headAndTail; // mutable struct: do not make this readonly + + /// Indicates whether the segment has been marked such that dequeues don't overwrite the removed data. + internal bool _preservedForObservation; + /// Indicates whether the segment has been marked such that no additional items may be enqueued. + internal bool _frozenForEnqueues; + /// The segment following this one in the queue, or null if this segment is the last in the queue. + internal Segment _nextSegment; + + /// Creates the segment. + /// + /// The maximum number of elements the segment can contain. Must be a power of 2. + /// + public Segment(int boundedLength) { - Debug.Assert(m_high >= SEGMENT_SIZE - 1); - Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow - m_next = newSegment; - return newSegment; + // Validate the length + Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}"); + Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}"); + + // Initialize the slots and the mask. The mask is used as a way of quickly doing "% _slots.Length", + // instead letting us do "& _slotsMask". + _slots = new Slot[boundedLength]; + _slotsMask = boundedLength - 1; + + // Initialize the sequence number for each slot. The sequence number provides a ticket that + // allows dequeuers to know whether they can dequeue and enqueuers to know whether they can + // enqueue. An enqueuer at position N can enqueue when the sequence number is N, and a dequeuer + // for position N can dequeue when the sequence number is N + 1. When an enqueuer is done writing + // at position N, it sets the sequence number to N so that a dequeuer will be able to dequeue, + // and when a dequeuer is done dequeueing at position N, it sets the sequence number to N + _slots.Length, + // so that when an enqueuer loops around the slots, it'll find that the sequence number at + // position N is N. This also means that when an enqueuer finds that at position N the sequence + // number is < N, there is still a value in that slot, i.e. the segment is full, and when a + // dequeuer finds that the value in a slot is < N + 1, there is nothing currently available to + // dequeue. (It is possible for multiple enqueuers to enqueue concurrently, writing into + // subsequent slots, and to have the first enqueuer take longer, so that the slots for 1, 2, 3, etc. + // may have values, but the 0th slot may still be being filled... in that case, TryDequeue will + // return false.) + for (int i = 0; i < _slots.Length; i++) + { + _slots[i].SequenceNumber = i; + } } - /// - /// Create a new segment and append to the current one - /// Update the m_tail pointer - /// This method is called when there is no contention - /// - internal void Grow() - { - //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) - Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow - m_next = newSegment; - Debug.Assert(m_source.m_tail == this); - m_source.m_tail = m_next; - } + /// Gets the number of elements this segment can store. + internal int Capacity => _slots.Length; + /// Gets the "freeze offset" for this segment. + internal int FreezeOffset => _slots.Length * 2; /// - /// Try to append an element at the end of this segment. + /// Ensures that the segment will not accept any subsequent enqueues that aren't already underway. /// - /// the element to append - /// The tail. - /// true if the element is appended, false if the current segment is full - /// if appending the specified element succeeds, and after which the segment is full, - /// then grow the segment - internal bool TryAppend(T value) + /// + /// When we mark a segment as being frozen for additional enqueues, + /// we set the bool, but that's mostly + /// as a small helper to avoid marking it twice. The real marking comes + /// by modifying the Tail for the segment, increasing it by this + /// . This effectively knocks it off the + /// sequence expected by future enqueuers, such that any additional enqueuer + /// will be unable to enqueue due to it not lining up with the expected + /// sequence numbers. This value is chosen specially so that Tail will grow + /// to a value that maps to the same slot but that won't be confused with + /// any other enqueue/dequeue sequence number. + /// + internal void EnsureFrozenForEnqueues() // must only be called while queue's segment lock is held { - //quickly check if m_high is already over the boundary, if so, bail out - if (m_high >= SEGMENT_SIZE - 1) + if (!_frozenForEnqueues) // flag used to ensure we don't increase the Tail more than once if frozen more than once { - return false; - } + _frozenForEnqueues = true; - //Now we will use a CAS to increment m_high, and store the result in newhigh. - //Depending on how many free spots left in this segment and how many threads are doing this Increment - //at this time, the returning "newhigh" can be - // 1) < SEGMENT_SIZE - 1 : we took a spot in this segment, and not the last one, just insert the value - // 2) == SEGMENT_SIZE - 1 : we took the last spot, insert the value AND grow the segment - // 3) > SEGMENT_SIZE - 1 : we failed to reserve a spot in this segment, we return false to - // Queue.Enqueue method, telling it to try again in the next segment. - - int newhigh = SEGMENT_SIZE; //initial value set to be over the boundary - - //We need do Interlocked.Increment and value/state update in a finally block to ensure that they run - //without interuption. This is to prevent anything from happening between them, and another dequeue - //thread maybe spinning forever to wait for m_state[] to be true; - try - { } - finally - { - newhigh = Interlocked.Increment(ref m_high); - if (newhigh <= SEGMENT_SIZE - 1) + // Increase the tail by FreezeOffset, spinning until we're successful in doing so. + var spinner = new SpinWait(); + while (true) { - m_array[newhigh] = value; - m_state[newhigh].m_value = true; - } - - //if this thread takes up the last slot in the segment, then this thread is responsible - //to grow a new segment. Calling Grow must be in the finally block too for reliability reason: - //if thread abort during Grow, other threads will be left busy spinning forever. - if (newhigh == SEGMENT_SIZE - 1) - { - Grow(); + int tail = Volatile.Read(ref _headAndTail.Tail); + if (Interlocked.CompareExchange(ref _headAndTail.Tail, tail + FreezeOffset, tail) == tail) + { + break; + } + spinner.SpinOnce(); } } - - //if newhigh <= SEGMENT_SIZE-1, it means the current thread successfully takes up a spot - return newhigh <= SEGMENT_SIZE - 1; } - - /// - /// try to remove an element from the head of current segment - /// - /// The result. - /// The head. - /// return false only if the current segment is empty - internal bool TryRemove(out T result) + /// Tries to dequeue an element from the queue. + public bool TryDequeue(out T item) { - SpinWait spin = new SpinWait(); - int lowLocal = Low, highLocal = High; - while (lowLocal <= highLocal) + // Loop in case of contention... + var spinner = new SpinWait(); + while (true) { - //try to update m_low - if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) - { - //if the specified value is not available (this spot is taken by a push operation, - // but the value is not written into yet), then spin - SpinWait spinLocal = new SpinWait(); - while (!m_state[lowLocal].m_value) - { - spinLocal.SpinOnce(); - } - result = m_array[lowLocal]; + // Get the head at which to try to dequeue. + int currentHead = Volatile.Read(ref _headAndTail.Head); + int slotsIndex = currentHead & _slotsMask; - // If there is no other thread taking snapshot (GetEnumerator(), ToList(), etc), reset the deleted entry to null. - // It is ok if after this conditional check m_numSnapshotTakers becomes > 0, because new snapshots won't include - // the deleted entry at m_array[lowLocal]. - if (m_source.m_numSnapshotTakers <= 0) - { - m_array[lowLocal] = default(T); //release the reference to the object. - } + // Read the sequence number for the head position. + int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber); - //if the current thread sets m_low to SEGMENT_SIZE, which means the current segment becomes - //disposable, then this thread is responsible to dispose this segment, and reset m_head - if (lowLocal + 1 >= SEGMENT_SIZE) + // We can dequeue from this slot if it's been filled by an enqueuer, which + // would have left the sequence number at pos+1. + if (sequenceNumber == currentHead + 1) + { + // We may be racing with other dequeuers. Try to reserve the slot by incrementing + // the head. Once we've done that, no one else will be able to read from this slot, + // and no enqueuer will be able to read from this slot until we've written the new + // sequence number. WARNING: The next few lines are not reliable on a runtime that + // supports thread aborts. If a thread abort were to sneak in after the CompareExchange + // but before the Volatile.Write, enqueuers trying to enqueue into this slot would + // spin indefinitely. If this implementation is ever used on such a platform, this + // if block should be wrapped in a finally / prepared region. + if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead) { - // Invariant: we only dispose the current m_head, not any other segment - // In usual situation, disposing a segment is simply seting m_head to m_head.m_next - // But there is one special case, where m_head and m_tail points to the same and ONLY - //segment of the queue: Another thread A is doing Enqueue and finds that it needs to grow, - //while the *current* thread is doing *this* Dequeue operation, and finds that it needs to - //dispose the current (and ONLY) segment. Then we need to wait till thread A finishes its - //Grow operation, this is the reason of having the following while loop - spinLocal = new SpinWait(); - while (m_next == null) + // Successfully reserved the slot. Note that after the above CompareExchange, other threads + // trying to dequeue from this slot will end up spinning until we do the subsequent Write. + item = _slots[slotsIndex].Item; + if (!Volatile.Read(ref _preservedForObservation)) { - spinLocal.SpinOnce(); + // If we're preserving, though, we don't zero out the slot, as we need it for + // enumerations, peeking, ToArray, etc. And we don't update the sequence number, + // so that an enqueuer will see it as full and be forced to move to a new segment. + _slots[slotsIndex].Item = default(T); + Volatile.Write(ref _slots[slotsIndex].SequenceNumber, currentHead + _slots.Length); } - Debug.Assert(m_source.m_head == this); - m_source.m_head = m_next; + return true; } - return true; } - else + else if (sequenceNumber < currentHead + 1) { - //CAS failed due to contention: spin briefly and retry - spin.SpinOnce(); - lowLocal = Low; highLocal = High; + // The sequence number was less than what we needed, which means this slot doesn't + // yet contain a value we can dequeue, i.e. the segment is empty. Technically it's + // possible that multiple enqueuers could have written concurrently, with those + // getting later slots actually finishing first, so there could be elements after + // this one that are available, but we need to dequeue in order. So before declaring + // failure and that the segment is empty, we check the tail to see if we're actually + // empty or if we're just waiting for items in flight or after this one to become available. + bool frozen = _frozenForEnqueues; + int currentTail = Volatile.Read(ref _headAndTail.Tail); + if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))) + { + item = default(T); + return false; + } + + // It's possible it could have become frozen after we checked _frozenForEnqueues + // and before reading the tail. That's ok: in that rare race condition, we just + // loop around again. } - }//end of while - result = default(T); - return false; + + // Lost a race. Spin a bit, then try again. + spinner.SpinOnce(); + } } - /// - /// try to peek the current segment - /// - /// holds the return value of the element at the head position, - /// value set to default(T) if there is no such an element - /// true if there are elements in the current segment, false otherwise - internal bool TryPeek(out T result) + /// Tries to peek at an element from the queue, without removing it. + public bool TryPeek(out T result, bool resultUsed) { - result = default(T); - int lowLocal = Low; - if (lowLocal > High) - return false; - SpinWait spin = new SpinWait(); - while (!m_state[lowLocal].m_value) + if (resultUsed) { - spin.SpinOnce(); + // In order to ensure we don't get a torn read on the value, we mark the segment + // as preserving for observation. Additional items can still be enqueued to this + // segment, but no space will be freed during dequeues, such that the segment will + // no longer be reusable. + _preservedForObservation = true; + Interlocked.MemoryBarrier(); } - result = m_array[lowLocal]; - return true; - } - /// - /// Adds part or all of the current segment into a List. - /// - /// the list to which to add - /// the start position - /// the end position - internal void AddToList(List list, int start, int end) - { - for (int i = start; i <= end; i++) + // Loop in case of contention... + var spinner = new SpinWait(); + while (true) { - SpinWait spin = new SpinWait(); - while (!m_state[i].m_value) + // Get the head at which to try to peek. + int currentHead = Volatile.Read(ref _headAndTail.Head); + int slotsIndex = currentHead & _slotsMask; + + // Read the sequence number for the head position. + int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber); + + // We can peek from this slot if it's been filled by an enqueuer, which + // would have left the sequence number at pos+1. + if (sequenceNumber == currentHead + 1) + { + result = resultUsed ? _slots[slotsIndex].Item : default(T); + return true; + } + else if (sequenceNumber < currentHead + 1) { - spin.SpinOnce(); + // The sequence number was less than what we needed, which means this slot doesn't + // yet contain a value we can peek, i.e. the segment is empty. Technically it's + // possible that multiple enqueuers could have written concurrently, with those + // getting later slots actually finishing first, so there could be elements after + // this one that are available, but we need to peek in order. So before declaring + // failure and that the segment is empty, we check the tail to see if we're actually + // empty or if we're just waiting for items in flight or after this one to become available. + bool frozen = _frozenForEnqueues; + int currentTail = Volatile.Read(ref _headAndTail.Tail); + if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))) + { + result = default(T); + return false; + } + + // It's possible it could have become frozen after we checked _frozenForEnqueues + // and before reading the tail. That's ok: in that rare race condition, we just + // loop around again. } - list.Add(m_array[i]); + + // Lost a race. Spin a bit, then try again. + spinner.SpinOnce(); } } /// - /// return the position of the head of the current segment - /// Value range [0, SEGMENT_SIZE], if it's SEGMENT_SIZE, it means this segment is exhausted and thus empty + /// Attempts to enqueue the item. If successful, the item will be stored + /// in the queue and true will be returned; otherwise, the item won't be stored, and false + /// will be returned. /// - internal int Low + public bool TryEnqueue(T item) { - get + // Loop in case of contention... + var spinner = new SpinWait(); + while (true) { - return Math.Min(m_low, SEGMENT_SIZE); + // Get the tail at which to try to return. + int currentTail = Volatile.Read(ref _headAndTail.Tail); + int slotsIndex = currentTail & _slotsMask; + + // Read the sequence number for the tail position. + int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber); + + // The slot is empty and ready for us to enqueue into it if its sequence + // number matches the slot. + if (sequenceNumber == currentTail) + { + // We may be racing with other enqueuers. Try to reserve the slot by incrementing + // the tail. Once we've done that, no one else will be able to write to this slot, + // and no dequeuer will be able to read from this slot until we've written the new + // sequence number. WARNING: The next few lines are not reliable on a runtime that + // supports thread aborts. If a thread abort were to sneak in after the CompareExchange + // but before the Volatile.Write, other threads will spin trying to access this slot. + // If this implementation is ever used on such a platform, this if block should be + // wrapped in a finally / prepared region. + if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail) + { + // Successfully reserved the slot. Note that after the above CompareExchange, other threads + // trying to return will end up spinning until we do the subsequent Write. + _slots[slotsIndex].Item = item; + Volatile.Write(ref _slots[slotsIndex].SequenceNumber, currentTail + 1); + return true; + } + } + else if (sequenceNumber < currentTail) + { + // The sequence number was less than what we needed, which means this slot still + // contains a value, i.e. the segment is full. Technically it's possible that multiple + // dequeuers could have read concurrently, with those getting later slots actually + // finishing first, so there could be spaces after this one that are available, but + // we need to enqueue in order. + return false; + } + + // Lost a race. Spin a bit, then try again. + spinner.SpinOnce(); } } - /// - /// return the logical position of the tail of the current segment - /// Value range [-1, SEGMENT_SIZE-1]. When it's -1, it means this is a new segment and has no elemnet yet - /// - internal int High + /// Represents a slot in the queue. + [StructLayout(LayoutKind.Auto)] + [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")] + internal struct Slot { - get - { - //if m_high > SEGMENT_SIZE, it means it's out of range, we should return - //SEGMENT_SIZE-1 as the logical position - return Math.Min(m_high, SEGMENT_SIZE - 1); - } + /// The item. + public T Item; + /// The sequence number for this slot, used to synchronize between enqueuers and dequeuers. + public int SequenceNumber; } - } - }//end of class Segment + } - /// - /// A wrapper struct for volatile bool, please note the copy of the struct it self will not be volatile - /// for example this statement will not include in volatilness operation volatileBool1 = volatileBool2 the jit will copy the struct and will ignore the volatile - /// - struct VolatileBool + /// Padded head and tail indices, to avoid false sharing between producers and consumers. + [DebuggerDisplay("Head = {Head}, Tail = {Tail}")] + [StructLayout(LayoutKind.Explicit, Size = 192)] // padding before/between/after fields based on typical cache line size of 64 + internal struct PaddedHeadAndTail { - public VolatileBool(bool value) - { - m_value = value; - } - public volatile bool m_value; + [FieldOffset(64)] public int Head; + [FieldOffset(128)] public int Tail; } } -- 2.7.4