From e424fbb0978c156aa9fc14dab439e29d9d6bd855 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Wed, 25 Oct 2017 18:55:40 -0400 Subject: [PATCH] Add System.Threading.Channels to corefx Bring the source over from corefxlab, add a package, get everything building, etc. Commit migrated from https://github.com/dotnet/corefx/commit/c8ec3a850bd05aa8d8a6067071b723366b5648a3 --- .../Concurrent/SingleProducerConsumerQueue.cs | 315 ++++++++++++++ .../System.Threading.Channels.sln | 64 +++ src/libraries/System.Threading.Channels/dir.props | 8 + .../pkg/System.Threading.Channels.pkgproj | 14 + .../ref/Configurations.props | 8 + .../ref/System.Threading.Channels.cs | 82 ++++ .../ref/System.Threading.Channels.csproj | 17 + .../src/Configurations.props | 8 + .../src/Resources/Strings.resx | 123 ++++++ .../src/System.Threading.Channels.csproj | 45 ++ .../src/System/Collections/Generic/Dequeue.cs | 124 ++++++ .../System/Threading/Channels/BoundedChannel.cs | 411 ++++++++++++++++++ .../Threading/Channels/BoundedChannelFullMode.cs | 19 + .../src/System/Threading/Channels/Channel.cs | 76 ++++ .../Threading/Channels/ChannelClosedException.cs | 28 ++ .../System/Threading/Channels/ChannelOptions.cs | 107 +++++ .../src/System/Threading/Channels/ChannelReader.cs | 34 ++ .../System/Threading/Channels/ChannelUtilities.cs | 138 +++++++ .../src/System/Threading/Channels/ChannelWriter.cs | 79 ++++ .../src/System/Threading/Channels/Channel_1.cs | 10 + .../src/System/Threading/Channels/Channel_2.cs | 29 ++ .../System/Threading/Channels/IDebugEnumerator.cs | 30 ++ .../src/System/Threading/Channels/Interactor.cs | 101 +++++ .../Channels/SingleConsumerUnboundedChannel.cs | 236 +++++++++++ .../System/Threading/Channels/UnboundedChannel.cs | 232 +++++++++++ .../System/Threading/Channels/UnbufferedChannel.cs | 217 ++++++++++ .../src/System/VoidResult.cs | 9 + .../tests/BoundedChannelTests.cs | 402 ++++++++++++++++++ .../tests/ChannelClosedExceptionTests.cs | 27 ++ .../tests/ChannelTestBase.cs | 457 +++++++++++++++++++++ .../tests/ChannelTests.cs | 145 +++++++ .../tests/Configurations.props | 8 + .../tests/DebuggerAttributes.cs | 145 +++++++ .../tests/System.Threading.Channels.Tests.csproj | 21 + .../System.Threading.Channels/tests/TestBase.cs | 48 +++ .../tests/TestExtensions.cs | 36 ++ .../tests/UnboundedChannelTests.cs | 213 ++++++++++ .../tests/UnbufferedChannelTests.cs | 94 +++++ .../packageIndex.json | 6 + src/libraries/pkg/descriptions.json | 8 + 40 files changed, 4174 insertions(+) create mode 100644 src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs create mode 100644 src/libraries/System.Threading.Channels/System.Threading.Channels.sln create mode 100644 src/libraries/System.Threading.Channels/dir.props create mode 100644 src/libraries/System.Threading.Channels/pkg/System.Threading.Channels.pkgproj create mode 100644 src/libraries/System.Threading.Channels/ref/Configurations.props create mode 100644 src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs create mode 100644 src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj create mode 100644 src/libraries/System.Threading.Channels/src/Configurations.props create mode 100644 src/libraries/System.Threading.Channels/src/Resources/Strings.resx create mode 100644 src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj create mode 100644 src/libraries/System.Threading.Channels/src/System/Collections/Generic/Dequeue.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannelFullMode.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelClosedException.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelUtilities.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelWriter.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_1.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_2.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/Interactor.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/SingleConsumerUnboundedChannel.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnbufferedChannel.cs create mode 100644 src/libraries/System.Threading.Channels/src/System/VoidResult.cs create mode 100644 src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs create mode 100644 src/libraries/System.Threading.Channels/tests/ChannelClosedExceptionTests.cs create mode 100644 src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs create mode 100644 src/libraries/System.Threading.Channels/tests/ChannelTests.cs create mode 100644 src/libraries/System.Threading.Channels/tests/Configurations.props create mode 100644 src/libraries/System.Threading.Channels/tests/DebuggerAttributes.cs create mode 100644 src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj create mode 100644 src/libraries/System.Threading.Channels/tests/TestBase.cs create mode 100644 src/libraries/System.Threading.Channels/tests/TestExtensions.cs create mode 100644 src/libraries/System.Threading.Channels/tests/UnboundedChannelTests.cs create mode 100644 src/libraries/System.Threading.Channels/tests/UnbufferedChannelTests.cs diff --git a/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs b/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs new file mode 100644 index 0000000..c28db22 --- /dev/null +++ b/src/libraries/Common/src/System/Collections/Concurrent/SingleProducerConsumerQueue.cs @@ -0,0 +1,315 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading; + +namespace System.Collections.Concurrent +{ + /// + /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently. + /// + /// Specifies the type of data contained in the queue. + [DebuggerDisplay("Count = {Count}")] + [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))] + internal sealed class SingleProducerSingleConsumerQueue : IEnumerable + { + // Design: + // + // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used + // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by + // multiple producer threads concurrently or multiple consumer threads concurrently. + // + // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented + // as an array with two indexes: _first and _last. _first is the index of the array slot for the consumer + // to read next, and _last is the slot for the producer to write next. The circular buffer is empty when + // (_first == _last), and full when ((_last+1) % _array.Length == _first). + // + // Since _first is only ever modified by the consumer thread and _last by the producer, the two indices can + // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer, + // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds + // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big + // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first + // empty out the old buffer and only then follow the producer into the new (larger) buffer. + // + // As described above, the enqueue operation on the fast path only modifies the _first field of the current segment. + // However, it also needs to read _last in order to verify that there is room in the current segment. Similarly, the + // dequeue operation on the fast path only needs to modify _last, but also needs to read _first to verify that the + // queue is non-empty. This results in true cache line sharing between the producer and the consumer. + // + // The cache line sharing issue can be mitigating by having a possibly stale copy of _first that is owned by the producer, + // and a possibly stale copy of _last that is owned by the consumer. So, the consumer state is described using + // (_first, _lastCopy) and the producer state using (_firstCopy, _last). The consumer state is separated from + // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines. + // _lastCopy is the consumer's copy of _last. Whenever the consumer can tell that there is room in the buffer + // simply by observing _lastCopy, the consumer thread does not need to read _last and thus encounter a cache miss. Only + // when the buffer appears to be empty will the consumer refresh _lastCopy from _last. _firstCopy is used by the producer + // in the same way to avoid reading _first on the hot path. + + /// The initial size to use for segments (in number of elements). + private const int InitialSegmentSize = 32; // must be a power of 2 + /// The maximum size to use for segments (in number of elements). + private const int MaxSegmentSize = 0x1000000; // this could be made as large as Int32.MaxValue / 2 + + /// The head of the linked list of segments. + private volatile Segment _head; + /// The tail of the linked list of segments. + private volatile Segment _tail; + + /// Initializes the queue. + public SingleProducerSingleConsumerQueue() + { + // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation + Debug.Assert(InitialSegmentSize > 0, "Initial segment size must be > 0."); + Debug.Assert((InitialSegmentSize & (InitialSegmentSize - 1)) == 0, "Initial segment size must be a power of 2"); + Debug.Assert(InitialSegmentSize <= MaxSegmentSize, "Initial segment size should be <= maximum."); + Debug.Assert(MaxSegmentSize < int.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur."); + + // Initialize the queue + _head = _tail = new Segment(InitialSegmentSize); + } + + /// Enqueues an item into the queue. + /// The item to enqueue. + public void Enqueue(T item) + { + Segment segment = _tail; + T[] array = segment._array; + int last = segment._state._last; // local copy to avoid multiple volatile reads + + // Fast path: there's obviously room in the current segment + int tail2 = (last + 1) & (array.Length - 1); + if (tail2 != segment._state._firstCopy) + { + array[last] = item; + segment._state._last = tail2; + } + // Slow path: there may not be room in the current segment. + else EnqueueSlow(item, ref segment); + } + + /// Enqueues an item into the queue. + /// The item to enqueue. + /// The segment in which to first attempt to store the item. + private void EnqueueSlow(T item, ref Segment segment) + { + Debug.Assert(segment != null, "Expected a non-null segment."); + + if (segment._state._firstCopy != segment._state._first) + { + segment._state._firstCopy = segment._state._first; + Enqueue(item); // will only recur once for this enqueue operation + return; + } + + int newSegmentSize = _tail._array.Length << 1; // double size + Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow."); + if (newSegmentSize > MaxSegmentSize) newSegmentSize = MaxSegmentSize; + + var newSegment = new Segment(newSegmentSize); + newSegment._array[0] = item; + newSegment._state._last = 1; + newSegment._state._lastCopy = 1; + + try { } + finally + { + // Finally block to protect against corruption due to a thread abort + // between setting _next and setting _tail. + Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored + _tail = newSegment; + } + } + + /// Attempts to dequeue an item from the queue. + /// The dequeued item. + /// true if an item could be dequeued; otherwise, false. + public bool TryDequeue(out T result) + { + Segment segment = _head; + T[] array = segment._array; + int first = segment._state._first; // local copy to avoid multiple volatile reads + + // Fast path: there's obviously data available in the current segment + if (first != segment._state._lastCopy) + { + result = array[first]; + array[first] = default; // Clear the slot to release the element + segment._state._first = (first + 1) & (array.Length - 1); + return true; + } + // Slow path: there may not be data available in the current segment + else return TryDequeueSlow(ref segment, ref array, out result); + } + + /// Attempts to dequeue an item from the queue. + /// The array from which the item was dequeued. + /// The segment from which the item was dequeued. + /// The dequeued item. + /// true if an item could be dequeued; otherwise, false. + private bool TryDequeueSlow(ref Segment segment, ref T[] array, out T result) + { + Debug.Assert(segment != null, "Expected a non-null segment."); + Debug.Assert(array != null, "Expected a non-null item array."); + + if (segment._state._last != segment._state._lastCopy) + { + segment._state._lastCopy = segment._state._last; + return TryDequeue(out result); // will only recur once for this dequeue operation + } + + if (segment._next != null && segment._state._first == segment._state._last) + { + segment = segment._next; + array = segment._array; + _head = segment; + } + + int first = segment._state._first; // local copy to avoid extraneous volatile reads + + if (first == segment._state._last) + { + result = default; + return false; + } + + result = array[first]; + array[first] = default; // Clear the slot to release the element + segment._state._first = (first + 1) & (segment._array.Length - 1); + segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy + + return true; + } + + /// Gets whether the collection is currently empty. + public bool IsEmpty + { + // This implementation is optimized for calls from the consumer. + get + { + Segment head = _head; + if (head._state._first != head._state._lastCopy) return false; // _first is volatile, so the read of _lastCopy cannot get reordered + if (head._state._first != head._state._last) return false; + return head._next == null; + } + } + + /// Gets an enumerable for the collection. + /// This method is not safe to use concurrently with any other members that may mutate the collection. + public IEnumerator GetEnumerator() + { + for (Segment segment = _head; segment != null; segment = segment._next) + { + for (int pt = segment._state._first; + pt != segment._state._last; + pt = (pt + 1) & (segment._array.Length - 1)) + { + yield return segment._array[pt]; + } + } + } + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + /// Gets the number of items in the collection. + /// This method is not safe to use concurrently with any other members that may mutate the collection. + internal int Count + { + get + { + int count = 0; + for (Segment segment = _head; segment != null; segment = segment._next) + { + int arraySize = segment._array.Length; + int first, last; + while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is + { + first = segment._state._first; + last = segment._state._last; + if (first == segment._state._first) break; + } + count += (last - first) & (arraySize - 1); + } + return count; + } + } + + /// A segment in the queue containing one or more items. + [StructLayout(LayoutKind.Sequential)] + private sealed class Segment + { + /// The next segment in the linked list of segments. + internal Segment _next; + /// The data stored in this segment. + internal readonly T[] _array; + /// Details about the segment. + internal SegmentState _state; // separated out to enable StructLayout attribute to take effect + + /// Initializes the segment. + /// The size to use for this segment. + internal Segment(int size) + { + Debug.Assert((size & (size - 1)) == 0, "Size must be a power of 2"); + _array = new T[size]; + } + } + + /// Stores information about a segment. + [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing + private struct SegmentState + { + /// Padding to reduce false sharing between the segment's array and _first. + internal PaddingFor32 _pad0; + + /// The index of the current head in the segment. + internal volatile int _first; + /// A copy of the current tail index. + internal int _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first + + /// Padding to reduce false sharing between the first and last. + internal PaddingFor32 _pad1; + + /// A copy of the current head index. + internal int _firstCopy; // not volatile as only read and written by the consumer thread + /// The index of the current tail in the segment. + internal volatile int _last; + + /// Padding to reduce false sharing with the last and what's after the segment. + internal PaddingFor32 _pad2; + } + + /// Debugger type proxy for a SingleProducerSingleConsumerQueue of T. + private sealed class SingleProducerSingleConsumerQueue_DebugView + { + /// The queue being visualized. + private readonly SingleProducerSingleConsumerQueue _queue; + + /// Initializes the debug view. + /// The queue being debugged. + public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue queue) + { + Debug.Assert(queue != null, "Expected a non-null queue."); + _queue = queue; + } + + /// Gets the contents of the list. + [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] + public T[] Items => new List(_queue).ToArray(); + } + } + + + /// A placeholder class for common padding constants and eventually routines. + internal static class PaddingHelpers + { + /// A size greater than or equal to the size of the most common CPU cache lines. + internal const int CACHE_LINE_SIZE = 128; + } + + /// Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}. + [StructLayout(LayoutKind.Explicit, Size = PaddingHelpers.CACHE_LINE_SIZE - sizeof(int))] // Based on common case of 64-byte cache lines + internal struct PaddingFor32 { } +} diff --git a/src/libraries/System.Threading.Channels/System.Threading.Channels.sln b/src/libraries/System.Threading.Channels/System.Threading.Channels.sln new file mode 100644 index 0000000..7a2097c --- /dev/null +++ b/src/libraries/System.Threading.Channels/System.Threading.Channels.sln @@ -0,0 +1,64 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.27019.1 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels.Tests", "tests\System.Threading.Channels.Tests.csproj", "{95DFC527-4DC1-495E-97D7-E94EE1F7140D}" + ProjectSection(ProjectDependencies) = postProject + {1DD0FF15-6234-4BD6-850A-317F05479554} = {1DD0FF15-6234-4BD6-850A-317F05479554} + EndProjectSection +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels", "src\System.Threading.Channels.csproj", "{1DD0FF15-6234-4BD6-850A-317F05479554}" + ProjectSection(ProjectDependencies) = postProject + {9C524CA0-92FF-437B-B568-BCE8A794A69A} = {9C524CA0-92FF-437B-B568-BCE8A794A69A} + EndProjectSection +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels", "ref\System.Threading.Channels.csproj", "{9C524CA0-92FF-437B-B568-BCE8A794A69A}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{1A2F9F4A-A032-433E-B914-ADD5992BB178}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E107E9C1-E893-4E87-987E-04EF0DCEAEFD}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + netstandard-Debug|Any CPU = netstandard-Debug|Any CPU + netstandard-Release|Any CPU = netstandard-Release|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.netstandard-Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.netstandard-Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.netstandard-Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.netstandard-Release|Any CPU.Build.0 = netstandard-Release|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU + {95DFC527-4DC1-495E-97D7-E94EE1F7140D}.Release|Any CPU.Build.0 = netstandard-Release|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.Debug|Any CPU.ActiveCfg = netstandard1.3-Debug|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.Debug|Any CPU.Build.0 = netstandard1.3-Debug|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.netstandard-Debug|Any CPU.ActiveCfg = netstandard1.3-Release|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.netstandard-Debug|Any CPU.Build.0 = netstandard1.3-Release|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.netstandard-Release|Any CPU.ActiveCfg = netstandard1.3-Release|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.netstandard-Release|Any CPU.Build.0 = netstandard1.3-Release|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.Release|Any CPU.ActiveCfg = netstandard1.3-Release|Any CPU + {1DD0FF15-6234-4BD6-850A-317F05479554}.Release|Any CPU.Build.0 = netstandard1.3-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.ActiveCfg = netstandard1.3-Debug|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.Build.0 = netstandard1.3-Debug|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.netstandard-Debug|Any CPU.ActiveCfg = netstandard1.3-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.netstandard-Debug|Any CPU.Build.0 = netstandard1.3-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.netstandard-Release|Any CPU.ActiveCfg = netstandard1.3-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.netstandard-Release|Any CPU.Build.0 = netstandard1.3-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.ActiveCfg = netstandard1.3-Release|Any CPU + {9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.Build.0 = netstandard1.3-Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {95DFC527-4DC1-495E-97D7-E94EE1F7140D} = {1A2F9F4A-A032-433E-B914-ADD5992BB178} + {1DD0FF15-6234-4BD6-850A-317F05479554} = {E107E9C1-E893-4E87-987E-04EF0DCEAEFD} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {83C15975-72A6-4FC2-9694-46EF0F4C7A3D} + EndGlobalSection +EndGlobal diff --git a/src/libraries/System.Threading.Channels/dir.props b/src/libraries/System.Threading.Channels/dir.props new file mode 100644 index 0000000..4356dec --- /dev/null +++ b/src/libraries/System.Threading.Channels/dir.props @@ -0,0 +1,8 @@ + + + + + 4.0.0.0 + Open + + \ No newline at end of file diff --git a/src/libraries/System.Threading.Channels/pkg/System.Threading.Channels.pkgproj b/src/libraries/System.Threading.Channels/pkg/System.Threading.Channels.pkgproj new file mode 100644 index 0000000..c96fb26 --- /dev/null +++ b/src/libraries/System.Threading.Channels/pkg/System.Threading.Channels.pkgproj @@ -0,0 +1,14 @@ + + + + + + 2.8.6 + + + + netcoreapp2.0;net461;$(AllXamarinFrameworks) + + + + diff --git a/src/libraries/System.Threading.Channels/ref/Configurations.props b/src/libraries/System.Threading.Channels/ref/Configurations.props new file mode 100644 index 0000000..78953df --- /dev/null +++ b/src/libraries/System.Threading.Channels/ref/Configurations.props @@ -0,0 +1,8 @@ + + + + + netstandard; + + + diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs new file mode 100644 index 0000000..c76c10a --- /dev/null +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs @@ -0,0 +1,82 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. +// ------------------------------------------------------------------------------ +// Changes to this file must follow the http://aka.ms/api-review process. +// ------------------------------------------------------------------------------ + +namespace System.Threading.Channels +{ + public enum BoundedChannelFullMode + { + DropNewest = 1, + DropOldest = 2, + DropWrite = 3, + Wait = 0, + } + public sealed partial class BoundedChannelOptions : System.Threading.Channels.ChannelOptions + { + public BoundedChannelOptions(int capacity) { } + public int Capacity { get { throw null; } set { } } + public System.Threading.Channels.BoundedChannelFullMode FullMode { get { throw null; } set { } } + } + public static partial class Channel + { + public static System.Threading.Channels.Channel CreateBounded(int capacity) { throw null; } + public static System.Threading.Channels.Channel CreateBounded(System.Threading.Channels.BoundedChannelOptions options) { throw null; } + public static System.Threading.Channels.Channel CreateUnbounded() { throw null; } + public static System.Threading.Channels.Channel CreateUnbounded(System.Threading.Channels.UnboundedChannelOptions options) { throw null; } + public static System.Threading.Channels.Channel CreateUnbuffered() { throw null; } + public static System.Threading.Channels.Channel CreateUnbuffered(System.Threading.Channels.UnbufferedChannelOptions options) { throw null; } + } + public partial class ChannelClosedException : System.InvalidOperationException + { + public ChannelClosedException() { } + public ChannelClosedException(System.Exception innerException) { } + public ChannelClosedException(string message) { } + public ChannelClosedException(string message, System.Exception innerException) { } + } + public abstract partial class ChannelOptions + { + protected ChannelOptions() { } + public bool AllowSynchronousContinuations { get { throw null; } set { } } + public bool SingleReader { get { throw null; } set { } } + public bool SingleWriter { get { throw null; } set { } } + } + public abstract partial class ChannelReader + { + protected ChannelReader() { } + public virtual System.Threading.Tasks.Task Completion { get { throw null; } } + public abstract bool TryRead(out T item); + public abstract System.Threading.Tasks.Task WaitToReadAsync(System.Threading.CancellationToken cancellationToken=default); + } + public abstract partial class ChannelWriter + { + protected ChannelWriter() { } + public void Complete(System.Exception error=null) { } + public virtual bool TryComplete(System.Exception error=null) { throw null; } + public abstract bool TryWrite(T item); + public abstract System.Threading.Tasks.Task WaitToWriteAsync(System.Threading.CancellationToken cancellationToken=default); + public virtual System.Threading.Tasks.Task WriteAsync(T item, System.Threading.CancellationToken cancellationToken=default) { throw null; } + } + public abstract partial class Channel : System.Threading.Channels.Channel + { + protected Channel() { } + } + public abstract partial class Channel + { + protected Channel() { } + public System.Threading.Channels.ChannelReader Reader { get { throw null; } protected set { } } + public System.Threading.Channels.ChannelWriter Writer { get { throw null; } protected set { } } + public static implicit operator System.Threading.Channels.ChannelReader (System.Threading.Channels.Channel channel) { throw null; } + public static implicit operator System.Threading.Channels.ChannelWriter (System.Threading.Channels.Channel channel) { throw null; } + } + public sealed partial class UnboundedChannelOptions : System.Threading.Channels.ChannelOptions + { + public UnboundedChannelOptions() { } + } + public sealed partial class UnbufferedChannelOptions : System.Threading.Channels.ChannelOptions + { + public UnbufferedChannelOptions() { } + } +} diff --git a/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj new file mode 100644 index 0000000..557c7f0 --- /dev/null +++ b/src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj @@ -0,0 +1,17 @@ + + + + + {9C524CA0-92FF-437B-B568-BCE8A794A69A} + + + + + + + + + + + + diff --git a/src/libraries/System.Threading.Channels/src/Configurations.props b/src/libraries/System.Threading.Channels/src/Configurations.props new file mode 100644 index 0000000..78953df --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/Configurations.props @@ -0,0 +1,8 @@ + + + + + netstandard; + + + diff --git a/src/libraries/System.Threading.Channels/src/Resources/Strings.resx b/src/libraries/System.Threading.Channels/src/Resources/Strings.resx new file mode 100644 index 0000000..2beea8a --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/Resources/Strings.resx @@ -0,0 +1,123 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + The channel has been closed. + + \ No newline at end of file diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj new file mode 100644 index 0000000..deac429 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -0,0 +1,45 @@ + + + + + {1DD0FF15-6234-4BD6-850A-317F05479554} + System.Threading.Channels + $(OutputPath)$(MSBuildProjectName).xml + + + + + + + + + + + + + + + + + + + + + + + Common\System\Collections\Concurrent\SingleProducerConsumerQueue.cs + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Threading.Channels/src/System/Collections/Generic/Dequeue.cs b/src/libraries/System.Threading.Channels/src/System/Collections/Generic/Dequeue.cs new file mode 100644 index 0000000..6c44b73 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Collections/Generic/Dequeue.cs @@ -0,0 +1,124 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; + +namespace System.Collections.Generic +{ + /// Provides a double-ended queue data structure. + /// Type of the data stored in the dequeue. + [DebuggerDisplay("Count = {_size}")] + internal sealed class Dequeue + { + private T[] _array = Array.Empty(); + private int _head; // First valid element in the queue + private int _tail; // First open slot in the dequeue, unless the dequeue is full + private int _size; // Number of elements. + + public int Count => _size; + + public bool IsEmpty => _size == 0; + + public void EnqueueTail(T item) + { + if (_size == _array.Length) + { + Grow(); + } + + _array[_tail] = item; + if (++_tail == _array.Length) + { + _tail = 0; + } + _size++; + } + + //// Uncomment if/when enqueueing at the head is needed + //public void EnqueueHead(T item) + //{ + // if (_size == _array.Length) + // { + // Grow(); + // } + // + // _head = (_head == 0 ? _array.Length : _head) - 1; + // _array[_head] = item; + // _size++; + //} + + public T DequeueHead() + { + Debug.Assert(!IsEmpty); // caller's responsibility to make sure there are elements remaining + + T item = _array[_head]; + _array[_head] = default; + + if (++_head == _array.Length) + { + _head = 0; + } + _size--; + + return item; + } + + public T DequeueTail() + { + Debug.Assert(!IsEmpty); // caller's responsibility to make sure there are elements remaining + + if (--_tail == -1) + { + _tail = _array.Length - 1; + } + + T item = _array[_tail]; + _array[_tail] = default; + + _size--; + return item; + } + + public IEnumerator GetEnumerator() // meant for debug purposes only + { + int pos = _head; + int count = _size; + while (count-- > 0) + { + yield return _array[pos]; + pos = (pos + 1) % _array.Length; + } + } + + private void Grow() + { + Debug.Assert(_size == _array.Length); + Debug.Assert(_head == _tail); + + const int MinimumGrow = 4; + + int capacity = (int)(_array.Length * 2L); + if (capacity < _array.Length + MinimumGrow) + { + capacity = _array.Length + MinimumGrow; + } + + T[] newArray = new T[capacity]; + + if (_head == 0) + { + Array.Copy(_array, 0, newArray, 0, _size); + } + else + { + Array.Copy(_array, _head, newArray, 0, _array.Length - _head); + Array.Copy(_array, 0, newArray, _array.Length - _head, _tail); + } + + _array = newArray; + _head = 0; + _tail = _size; + } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs new file mode 100644 index 0000000..0b7ea44 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs @@ -0,0 +1,411 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// Provides a channel with a bounded capacity. + [DebuggerDisplay("Items={ItemsCountForDebugger}, Capacity={_bufferedCapacity}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + internal sealed class BoundedChannel : Channel, IDebugEnumerable + { + /// The mode used when the channel hits its bound. + private readonly BoundedChannelFullMode _mode; + /// Task signaled when the channel has completed. + private readonly TaskCompletionSource _completion; + /// The maximum capacity of the channel. + private readonly int _bufferedCapacity; + /// Items currently stored in the channel waiting to be read. + private readonly Dequeue _items = new Dequeue(); + /// Writers waiting to write to the channel. + private readonly Dequeue> _blockedWriters = new Dequeue>(); + /// Task signaled when any WaitToReadAsync waiters should be woken up. + private ReaderInteractor _waitingReaders; + /// Task signaled when any WaitToWriteAsync waiters should be woken up. + private ReaderInteractor _waitingWriters; + /// Whether to force continuations to be executed asynchronously from producer writes. + private readonly bool _runContinuationsAsynchronously; + /// Set to non-null once Complete has been called. + private Exception _doneWriting; + /// Gets an object used to synchronize all state on the instance. + private object SyncObj => _items; + + /// Initializes the . + /// The positive bounded capacity for the channel. + /// The mode used when writing to a full channel. + /// Whether to force continuations to be executed asynchronously. + internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously) + { + Debug.Assert(bufferedCapacity > 0); + _bufferedCapacity = bufferedCapacity; + _mode = mode; + _runContinuationsAsynchronously = runContinuationsAsynchronously; + _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); + Reader = new BoundedChannelReader(this); + Writer = new BoundedChannelWriter(this); + } + + private sealed class BoundedChannelReader : ChannelReader + { + internal readonly BoundedChannel _parent; + internal BoundedChannelReader(BoundedChannel parent) => _parent = parent; + + public override Task Completion => _parent._completion.Task; + + public override bool TryRead(out T item) + { + BoundedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Get an item if there is one. + if (!parent._items.IsEmpty) + { + item = DequeueItemAndPostProcess(); + return true; + } + } + + item = default; + return false; + } + + public override Task WaitToReadAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + BoundedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If there are any items available, a read is possible. + if (!parent._items.IsEmpty) + { + return ChannelUtilities.s_trueTask; + } + + // There were no items available, so if we're done writing, a read will never be possible. + if (parent._doneWriting != null) + { + return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? + Task.FromException(parent._doneWriting) : + ChannelUtilities.s_falseTask; + } + + // There were no items available, but there could be in the future, so ensure + // there's a blocked reader task and return it. + return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingReaders, parent._runContinuationsAsynchronously, cancellationToken); + } + } + + /// Dequeues an item, and then fixes up our state around writers and completion. + /// The dequeued item. + private T DequeueItemAndPostProcess() + { + BoundedChannel parent = _parent; + Debug.Assert(Monitor.IsEntered(parent.SyncObj)); + + // Dequeue an item. + T item = parent._items.DequeueHead(); + + // If we're now empty and we're done writing, complete the channel. + if (parent._doneWriting != null && parent._items.IsEmpty) + { + ChannelUtilities.Complete(parent._completion, parent._doneWriting); + } + + // If there are any writers blocked, there's now room for at least one + // to be promoted to have its item moved into the items queue. We need + // to loop while trying to complete the writer in order to find one that + // hasn't yet been canceled (canceled writers transition to canceled but + // remain in the physical queue). + while (!parent._blockedWriters.IsEmpty) + { + WriterInteractor w = parent._blockedWriters.DequeueHead(); + if (w.Success(default(VoidResult))) + { + parent._items.EnqueueTail(w.Item); + return item; + } + } + + // There was no blocked writer, so see if there's a WaitToWriteAsync + // we should wake up. + ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: true); + + // Return the item + return item; + } + } + + private sealed class BoundedChannelWriter : ChannelWriter + { + internal readonly BoundedChannel _parent; + internal BoundedChannelWriter(BoundedChannel parent) => _parent = parent; + + public override bool TryComplete(Exception error) + { + BoundedChannel parent = _parent; + bool completeTask; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we've already marked the channel as completed, bail. + if (parent._doneWriting != null) + { + return false; + } + + // Mark that we're done writing. + parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; + completeTask = parent._items.IsEmpty; + } + + // If there are no items in the queue, complete the channel's task, + // as no more data can possibly arrive at this point. We do this outside + // of the lock in case we'll be running synchronous completions, and we + // do it before completing blocked/waiting readers, so that when they + // wake up they'll see the task as being completed. + if (completeTask) + { + ChannelUtilities.Complete(parent._completion, error); + } + + // At this point, _blockedWriters and _waitingReaders/Writers will not be mutated: + // they're only mutated by readers/writers while holding the lock, and only if _doneWriting is null. + // We also know that only one thread (this one) will ever get here, as only that thread + // will be the one to transition from _doneWriting false to true. As such, we can + // freely manipulate them without any concurrency concerns. + ChannelUtilities.FailInteractors, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error)); + ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error); + ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: false, error: error); + + // Successfully transitioned to completed. + return true; + } + + public override bool TryWrite(T item) + { + ReaderInteractor waitingReaders = null; + + BoundedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we're done writing, nothing more to do. + if (parent._doneWriting != null) + { + return false; + } + + // Get the number of items in the channel currently. + int count = parent._items.Count; + + if (count == 0) + { + // There are no items in the channel, which means we may have waiting readers. + // Store the item. + parent._items.EnqueueTail(item); + waitingReaders = parent._waitingReaders; + if (waitingReaders == null) + { + // If no one's waiting to be notified about a 0-to-1 transition, we're done. + return true; + } + parent._waitingReaders = null; + } + else if (count < parent._bufferedCapacity) + { + // There's room in the channel. Since we're not transitioning from 0-to-1 and + // since there's room, we can simply store the item and exit without having to + // worry about blocked/waiting readers. + parent._items.EnqueueTail(item); + return true; + } + else if (parent._mode == BoundedChannelFullMode.Wait) + { + // The channel is full and we're in a wait mode. + // Simply exit and let the caller know we didn't write the data. + return false; + } + else if (parent._mode == BoundedChannelFullMode.DropWrite) + { + // The channel is full. Just ignore the item being added + // but say we added it. + return true; + } + else + { + // The channel is full, and we're in a dropping mode. + // Drop either the oldest or the newest and write the new item. + T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ? + parent._items.DequeueTail() : + parent._items.DequeueHead(); + parent._items.EnqueueTail(item); + return true; + } + } + + // We stored an item bringing the count up from 0 to 1. Alert + // any waiting readers that there may be something for them to consume. + // Since we're no longer holding the lock, it's possible we'll end up + // waking readers that have since come in. + waitingReaders.Success(item: true); + return true; + } + + public override Task WaitToWriteAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + BoundedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we're done writing, no writes will ever succeed. + if (parent._doneWriting != null) + { + return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? + Task.FromException(parent._doneWriting) : + ChannelUtilities.s_falseTask; + } + + // If there's space to write, a write is possible. + // And if the mode involves dropping/ignoring, we can always write, as even if it's + // full we'll just drop an element to make room. + if (parent._items.Count < parent._bufferedCapacity || parent._mode != BoundedChannelFullMode.Wait) + { + return ChannelUtilities.s_trueTask; + } + + // We're still allowed to write, but there's no space, so ensure a waiter is queued and return it. + return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingWriters, true, cancellationToken); + } + } + + public override Task WriteAsync(T item, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + ReaderInteractor waitingReaders = null; + + BoundedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we're done writing, trying to write is an error. + if (parent._doneWriting != null) + { + return Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting)); + } + + // Get the number of items in the channel currently. + int count = parent._items.Count; + + if (count == 0) + { + // There are no items in the channel, which means we may have waiting readers. + // Store the item. + parent._items.EnqueueTail(item); + waitingReaders = parent._waitingReaders; + if (waitingReaders == null) + { + // If no one's waiting to be notified about a 0-to-1 transition, we're done. + return ChannelUtilities.s_trueTask; + } + parent._waitingReaders = null; + } + else if (count < parent._bufferedCapacity) + { + // There's room in the channel. Since we're not transitioning from 0-to-1 and + // since there's room, we can simply store the item and exit without having to + // worry about blocked/waiting readers. + parent._items.EnqueueTail(item); + return ChannelUtilities.s_trueTask; + } + else if (parent._mode == BoundedChannelFullMode.Wait) + { + // The channel is full and we're in a wait mode. + // Queue the writer. + var writer = WriterInteractor.Create(true, cancellationToken, item); + parent._blockedWriters.EnqueueTail(writer); + return writer.Task; + } + else if (parent._mode == BoundedChannelFullMode.DropWrite) + { + // The channel is full and we're in ignore mode. + // Ignore the item but say we accepted it. + return ChannelUtilities.s_trueTask; + } + else + { + // The channel is full, and we're in a dropping mode. + // Drop either the oldest or the newest and write the new item. + T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ? + parent._items.DequeueTail() : + parent._items.DequeueHead(); + parent._items.EnqueueTail(item); + return ChannelUtilities.s_trueTask; + } + } + + // We stored an item bringing the count up from 0 to 1. Alert + // any waiting readers that there may be something for them to consume. + // Since we're no longer holding the lock, it's possible we'll end up + // waking readers that have since come in. + waitingReaders.Success(item: true); + return ChannelUtilities.s_trueTask; + } + } + + [Conditional("DEBUG")] + private void AssertInvariants() + { + Debug.Assert(SyncObj != null, "The sync obj must not be null."); + Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); + + if (!_items.IsEmpty) + { + Debug.Assert(_waitingReaders == null, "There are items available, so there shouldn't be any waiting readers."); + } + if (_items.Count < _bufferedCapacity) + { + Debug.Assert(_blockedWriters.IsEmpty, "There's space available, so there shouldn't be any blocked writers."); + Debug.Assert(_waitingWriters == null, "There's space available, so there shouldn't be any waiting writers."); + } + if (!_blockedWriters.IsEmpty) + { + Debug.Assert(_items.Count == _bufferedCapacity, "We should have a full buffer if there's a blocked writer."); + } + if (_completion.Task.IsCompleted) + { + Debug.Assert(_doneWriting != null, "We can only complete if we're done writing."); + } + } + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _items.Count; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _items.GetEnumerator(); + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannelFullMode.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannelFullMode.cs new file mode 100644 index 0000000..3852565 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannelFullMode.cs @@ -0,0 +1,19 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + /// Specifies the behavior to use when writing to a bounded channel that is already full. + public enum BoundedChannelFullMode + { + /// Wait for space to be available in order to complete the write operation. + Wait, + /// Remove and ignore the newest item in the channel in order to make room for the item being written. + DropNewest, + /// Remove and ignore the oldest item in the channel in order to make room for the item being written. + DropOldest, + /// Drop the item being written. + DropWrite + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs new file mode 100644 index 0000000..9dedd4e --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs @@ -0,0 +1,76 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + /// Provides static methods for creating channels. + public static class Channel + { + /// Creates an unbounded channel usable by any number of readers and writers concurrently. + /// The created channel. + public static Channel CreateUnbounded() => + new UnboundedChannel(runContinuationsAsynchronously: true); + + /// Creates an unbounded channel subject to the provided options. + /// Specifies the type of data in the channel. + /// Options that guide the behavior of the channel. + /// The created channel. + public static Channel CreateUnbounded(UnboundedChannelOptions options) => + options == null ? throw new ArgumentOutOfRangeException(nameof(options)) : + options.SingleReader ? new SingleConsumerUnboundedChannel(!options.AllowSynchronousContinuations) : + (Channel)new UnboundedChannel(!options.AllowSynchronousContinuations); + + /// Creates a channel with the specified maximum capacity. + /// Specifies the type of data in the channel. + /// The maximum number of items the channel may store. + /// The created channel. + /// + /// Channels created with this method apply the + /// behavior and prohibit continuations from running synchronously. + /// + public static Channel CreateBounded(int capacity) + { + if (capacity < 1) + { + throw new ArgumentOutOfRangeException(nameof(capacity)); + } + + return new BoundedChannel(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true); + } + + /// Creates a channel with the specified maximum capacity. + /// Specifies the type of data in the channel. + /// Options that guide the behavior of the channel. + /// The created channel. + public static Channel CreateBounded(BoundedChannelOptions options) + { + if (options == null) + { + throw new ArgumentOutOfRangeException(nameof(options)); + } + + return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations); + } + + /// Creates a channel that doesn't buffer any items. + /// Specifies the type of data in the channel. + /// The created channel. + public static Channel CreateUnbuffered() => + new UnbufferedChannel(); + + /// Creates a channel that doesn't buffer any items. + /// Specifies the type of data in the channel. + /// Options that guide the behavior of the channel. + /// The created channel. + public static Channel CreateUnbuffered(UnbufferedChannelOptions options) + { + if (options == null) + { + throw new ArgumentOutOfRangeException(nameof(options)); + } + + return new UnbufferedChannel(); + } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelClosedException.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelClosedException.cs new file mode 100644 index 0000000..fe9efc6 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelClosedException.cs @@ -0,0 +1,28 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + /// Exception thrown when a channel is used after it's been closed. + public class ChannelClosedException : InvalidOperationException + { + /// Initializes a new instance of the class. + public ChannelClosedException() : + base(SR.ChannelClosedException_DefaultMessage) { } + + /// Initializes a new instance of the class. + /// The message that describes the error. + public ChannelClosedException(string message) : base(message) { } + + /// Initializes a new instance of the class. + /// The exception that is the cause of this exception. + public ChannelClosedException(Exception innerException) : + base(SR.ChannelClosedException_DefaultMessage, innerException) { } + + /// Initializes a new instance of the class. + /// The message that describes the error. + /// The exception that is the cause of this exception. + public ChannelClosedException(string message, Exception innerException) : base(message, innerException) { } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs new file mode 100644 index 0000000..9172889 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs @@ -0,0 +1,107 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + /// Provides options that control the behavior of channel instances. + public abstract class ChannelOptions + { + /// + /// true if writers to the channel guarantee that there will only ever be at most one write operation + /// at a time; false if no such constraint is guaranteed. + /// + /// + /// If true, the channel may be able to optimize certain operations based on knowing about the single-writer guarantee. + /// The default is false. + /// + public bool SingleWriter { get; set; } + + /// + /// true readers from the channel guarantee that there will only ever be at most one read operation at a time; + /// false if no such constraint is guaranteed. + /// + /// + /// If true, the channel may be able to optimize certain operations based on knowing about the single-reader guarantee. + /// The default is false. + /// + public bool SingleReader { get; set; } + + /// + /// true if operations performed on a channel may synchronously invoke continuations subscribed to + /// notifications of pending async operations; false if all continuations should be invoked asynchronously. + /// + /// + /// Setting this option to true can provide measurable throughput improvements by avoiding + /// scheduling additional work items. However, it may come at the cost of reduced parallelism, as for example a producer + /// may then be the one to execute work associated with a consumer, and if not done thoughtfully, this can lead + /// to unexpected interactions. The default is false. + /// + public bool AllowSynchronousContinuations { get; set; } + } + + /// Provides options that control the behavior of instances. + public sealed class BoundedChannelOptions : ChannelOptions + { + /// The maximum number of items the bounded channel may store. + private int _capacity; + /// The behavior incurred by write operations when the channel is full. + private BoundedChannelFullMode _mode = BoundedChannelFullMode.Wait; + + /// Initializes the options. + /// The maximum number of items the bounded channel may store. + public BoundedChannelOptions(int capacity) + { + if (capacity < 1) + { + throw new ArgumentOutOfRangeException(nameof(capacity)); + } + + Capacity = capacity; + } + + /// Gets or sets the maximum number of items the bounded channel may store. + public int Capacity + { + get => _capacity; + set + { + if (value < 1) + { + throw new ArgumentOutOfRangeException(nameof(value)); + } + _capacity = value; + } + } + + /// Gets or sets the behavior incurred by write operations when the channel is full. + public BoundedChannelFullMode FullMode + { + get => _mode; + set + { + switch (value) + { + case BoundedChannelFullMode.Wait: + case BoundedChannelFullMode.DropNewest: + case BoundedChannelFullMode.DropOldest: + case BoundedChannelFullMode.DropWrite: + _mode = value; + break; + default: + throw new ArgumentOutOfRangeException(nameof(value)); + } + } + } + } + + /// Provides options that control the behavior of instances. + public sealed class UnboundedChannelOptions : ChannelOptions + { + } + + /// Provides options that control the behavior of instances. + public sealed class UnbufferedChannelOptions : ChannelOptions + { + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs new file mode 100644 index 0000000..a5d7d80 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs @@ -0,0 +1,34 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// + /// Provides a base class for reading from a channel. + /// + /// Specifies the type of data that may be read from the channel. + public abstract class ChannelReader + { + /// + /// Gets a that completes when no more data will ever + /// be available to be read from this channel. + /// + public virtual Task Completion => ChannelUtilities.s_neverCompletingTask; + + /// Attempts to read an item to the channel. + /// The read item, or a default value if no item could be read. + /// true if an item was read; otherwise, false if no item was read. + public abstract bool TryRead(out T item); + + /// Returns a that will complete when data is available to read. + /// A used to cancel the wait operation. + /// + /// A that will complete with a true result when data is available to read + /// or with a false result when no further data will ever be available to be read. + /// + public abstract Task WaitToReadAsync(CancellationToken cancellationToken = default(CancellationToken)); + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelUtilities.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelUtilities.cs new file mode 100644 index 0000000..a7411c4 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelUtilities.cs @@ -0,0 +1,138 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// Provides internal helper methods for implementing channels. + internal static class ChannelUtilities + { + /// Sentinel object used to indicate being done writing. + internal static readonly Exception s_doneWritingSentinel = new Exception(nameof(s_doneWritingSentinel)); + /// A cached task with a Boolean true result. + internal static readonly Task s_trueTask = Task.FromResult(true); + /// A cached task with a Boolean false result. + internal static readonly Task s_falseTask = Task.FromResult(false); + /// A cached task that never completes. + internal static readonly Task s_neverCompletingTask = new TaskCompletionSource().Task; + + /// Completes the specified TaskCompletionSource. + /// The source to complete. + /// + /// The optional exception with which to complete. + /// If this is null or the DoneWritingSentinel, the source will be completed successfully. + /// If this is an OperationCanceledException, it'll be completed with the exception's token. + /// Otherwise, it'll be completed as faulted with the exception. + /// + internal static void Complete(TaskCompletionSource tcs, Exception error = null) + { + if (error is OperationCanceledException oce) + { + tcs.TrySetCanceled(oce.CancellationToken); + } + else if (error != null && error != s_doneWritingSentinel) + { + tcs.TrySetException(error); + } + else + { + tcs.TrySetResult(default(VoidResult)); + } + } + + /// Gets a value task representing an error. + /// Specifies the type of the value that would have been returned. + /// The error. This may be . + /// The failed task. + internal static ValueTask GetInvalidCompletionValueTask(Exception error) + { + Debug.Assert(error != null); + + Task t = + error == s_doneWritingSentinel ? Task.FromException(CreateInvalidCompletionException()) : + error is OperationCanceledException oce ? Task.FromCanceled(oce.CancellationToken.IsCancellationRequested ? oce.CancellationToken : new CancellationToken(true)) : + Task.FromException(CreateInvalidCompletionException(error)); + + return new ValueTask(t); + } + + /// Wake up all of the waiters and null out the field. + /// The waiters. + /// The value with which to complete each waiter. + internal static void WakeUpWaiters(ref ReaderInteractor waiters, bool result) + { + ReaderInteractor w = waiters; + if (w != null) + { + w.Success(result); + waiters = null; + } + } + + /// Wake up all of the waiters and null out the field. + /// The waiters. + /// The success value with which to complete each waiter if error is null. + /// The failure with which to cmplete each waiter, if non-null. + internal static void WakeUpWaiters(ref ReaderInteractor waiters, bool result, Exception error = null) + { + ReaderInteractor w = waiters; + if (w != null) + { + if (error != null) + { + w.Fail(error); + } + else + { + w.Success(result); + } + waiters = null; + } + } + + /// Removes all interactors from the queue, failing each. + /// The queue of interactors to complete. + /// The error with which to complete each interactor. + internal static void FailInteractors(Dequeue interactors, Exception error) where T : Interactor + { + while (!interactors.IsEmpty) + { + interactors.DequeueHead().Fail(error ?? CreateInvalidCompletionException()); + } + } + + /// Gets or creates a "waiter" (e.g. WaitForRead/WriteAsync) interactor. + /// The field storing the waiter interactor. + /// true to force continuations to run asynchronously; otherwise, false. + /// The token to use to cancel the wait. + internal static Task GetOrCreateWaiter(ref ReaderInteractor waiter, bool runContinuationsAsynchronously, CancellationToken cancellationToken) + { + // Get the existing waiters interactor. + ReaderInteractor w = waiter; + + // If there isn't one, create one. This explicitly does not include the cancellation token, + // as we reuse it for any number of waiters that overlap. + if (w == null) + { + waiter = w = ReaderInteractor.Create(runContinuationsAsynchronously); + } + + // If the cancellation token can't be canceled, then just return the waiter task. + // If it can, we need to return a task that will complete when the waiter task does but that can also be canceled. + // Easiest way to do that is with a cancelable continuation. + return cancellationToken.CanBeCanceled ? + w.Task.ContinueWith(t => t.Result, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) : + w.Task; + } + + /// Creates and returns an exception object to indicate that a channel has been closed. + internal static Exception CreateInvalidCompletionException(Exception inner = null) => + inner is OperationCanceledException ? inner : + inner != null && inner != s_doneWritingSentinel ? new ChannelClosedException(inner) : + new ChannelClosedException(); + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelWriter.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelWriter.cs new file mode 100644 index 0000000..e96c1ff --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelWriter.cs @@ -0,0 +1,79 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// + /// Provides a base class for writing to a channel. + /// + /// Specifies the type of data that may be written to the channel. + public abstract class ChannelWriter + { + /// Attempts to mark the channel as being completed, meaning no more data will be written to it. + /// An indicating the failure causing no more data to be written, or null for success. + /// + /// true if this operation successfully completes the channel; otherwise, false if the channel could not be marked for completion, + /// for example due to having already been marked as such, or due to not supporting completion. + /// + public virtual bool TryComplete(Exception error = null) => false; + + /// Attempts to write the specified item to the channel. + /// The item to write. + /// true if the item was written; otherwise, false if it wasn't written. + public abstract bool TryWrite(T item); + + /// Returns a that will complete when space is available to write an item. + /// A used to cancel the wait operation. + /// + /// A that will complete with a true result when space is available to write an item + /// or with a false result when no further writing will be permitted. + /// + public abstract Task WaitToWriteAsync(CancellationToken cancellationToken = default(CancellationToken)); + + /// Asynchronously writes an item to the channel. + /// The value to write to the channel. + /// A used to cancel the write operation. + /// A that represents the asynchronous write operation. + public virtual Task WriteAsync(T item, CancellationToken cancellationToken = default(CancellationToken)) + { + try + { + return + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + TryWrite(item) ? Task.CompletedTask : + WriteAsyncCore(item, cancellationToken); + } + catch (Exception e) + { + return Task.FromException(e); + } + + async Task WriteAsyncCore(T innerItem, CancellationToken ct) + { + while (await WaitToWriteAsync(ct).ConfigureAwait(false)) + { + if (TryWrite(innerItem)) + { + return; + } + } + + throw ChannelUtilities.CreateInvalidCompletionException(); + } + } + + /// Mark the channel as being complete, meaning no more items will be written to it. + /// Optional Exception indicating a failure that's causing the channel to complete. + /// The channel has already been marked as complete. + public void Complete(Exception error = null) + { + if (!TryComplete(error)) + { + throw ChannelUtilities.CreateInvalidCompletionException(); + } + } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_1.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_1.cs new file mode 100644 index 0000000..c10dea34 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_1.cs @@ -0,0 +1,10 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + /// Provides a base class for channels that support reading and writing elements of type . + /// Specifies the type of data readable and writable in the channel. + public abstract class Channel : Channel { } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_2.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_2.cs new file mode 100644 index 0000000..d8e2b28 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel_2.cs @@ -0,0 +1,29 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + /// + /// Provides a base class for channels that support reading elements of type + /// and writing elements of type . + /// + /// Specifies the type of data that may be written to the channel. + /// Specifies the type of data that may be read from the channel. + public abstract class Channel + { + /// Gets the readable half of this channel. + public ChannelReader Reader { get; protected set; } + + /// Gets the writable half of this channel. + public ChannelWriter Writer { get; protected set; } + + /// Implicit cast from a channel to its readable half. + /// The channel being cast. + public static implicit operator ChannelReader(Channel channel) => channel.Reader; + + /// Implicit cast from a channel to its writable half. + /// The channel being cast. + public static implicit operator ChannelWriter(Channel channel) => channel.Writer; + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs new file mode 100644 index 0000000..d83eb78 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs @@ -0,0 +1,30 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; + +namespace System.Threading.Channels +{ + interface IDebugEnumerable + { + IEnumerator GetEnumerator(); + } + + internal sealed class DebugEnumeratorDebugView + { + public DebugEnumeratorDebugView(IDebugEnumerable enumerable) + { + var list = new List(); + foreach (T item in enumerable) + { + list.Add(item); + } + Items = list.ToArray(); + } + + [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] + public T[] Items { get; } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Interactor.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Interactor.cs new file mode 100644 index 0000000..cac9aaa --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Interactor.cs @@ -0,0 +1,101 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + internal abstract class Interactor : TaskCompletionSource + { + protected Interactor(bool runContinuationsAsynchronously) : + base(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None) { } + + internal bool Success(T item) + { + UnregisterCancellation(); + return TrySetResult(item); + } + + internal bool Fail(Exception exception) + { + UnregisterCancellation(); + return TrySetException(exception); + } + + internal virtual void UnregisterCancellation() { } + } + + internal class ReaderInteractor : Interactor + { + protected ReaderInteractor(bool runContinuationsAsynchronously) : base(runContinuationsAsynchronously) { } + + public static ReaderInteractor Create(bool runContinuationsAsynchronously) => + new ReaderInteractor(runContinuationsAsynchronously); + + public static ReaderInteractor Create(bool runContinuationsAsynchronously, CancellationToken cancellationToken) => + cancellationToken.CanBeCanceled ? + new CancelableReaderInteractor(runContinuationsAsynchronously, cancellationToken) : + new ReaderInteractor(runContinuationsAsynchronously); + } + + internal class WriterInteractor : Interactor + { + protected WriterInteractor(bool runContinuationsAsynchronously) : base(runContinuationsAsynchronously) { } + + internal T Item { get; private set; } + + public static WriterInteractor Create(bool runContinuationsAsynchronously, CancellationToken cancellationToken, T item) + { + WriterInteractor w = cancellationToken.CanBeCanceled ? + new CancelableWriter(runContinuationsAsynchronously, cancellationToken) : + new WriterInteractor(runContinuationsAsynchronously); + w.Item = item; + return w; + } + } + + internal sealed class CancelableReaderInteractor : ReaderInteractor + { + private CancellationToken _token; + private CancellationTokenRegistration _registration; + + internal CancelableReaderInteractor(bool runContinuationsAsynchronously, CancellationToken cancellationToken) : base(runContinuationsAsynchronously) + { + _token = cancellationToken; + _registration = cancellationToken.Register(s => + { + var thisRef = (CancelableReaderInteractor)s; + thisRef.TrySetCanceled(thisRef._token); + }, this); + } + + internal override void UnregisterCancellation() + { + _registration.Dispose(); + _registration = default(CancellationTokenRegistration); + } + } + + internal sealed class CancelableWriter : WriterInteractor + { + private CancellationToken _token; + private CancellationTokenRegistration _registration; + + internal CancelableWriter(bool runContinuationsAsynchronously, CancellationToken cancellationToken) : base(runContinuationsAsynchronously) + { + _token = cancellationToken; + _registration = cancellationToken.Register(s => + { + var thisRef = (CancelableWriter)s; + thisRef.TrySetCanceled(thisRef._token); + }, this); + } + + internal override void UnregisterCancellation() + { + _registration.Dispose(); + _registration = default(CancellationTokenRegistration); + } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/SingleConsumerUnboundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/SingleConsumerUnboundedChannel.cs new file mode 100644 index 0000000..294b588 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/SingleConsumerUnboundedChannel.cs @@ -0,0 +1,236 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// + /// Provides a buffered channel of unbounded capacity for use by any number + /// of writers but at most a single reader at a time. + /// + [DebuggerDisplay("Items={ItemsCountForDebugger}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + internal sealed class SingleConsumerUnboundedChannel : Channel, IDebugEnumerable + { + /// Task that indicates the channel has completed. + private readonly TaskCompletionSource _completion; + /// + /// A concurrent queue to hold the items for this channel. The queue itself supports at most + /// one writer and one reader at a time; as a result, since this channel supports multiple writers, + /// all write access to the queue must be synchronized by the channel. + /// + private readonly SingleProducerSingleConsumerQueue _items = new SingleProducerSingleConsumerQueue(); + /// Whether to force continuations to be executed asynchronously from producer writes. + private readonly bool _runContinuationsAsynchronously; + + /// non-null if the channel has been marked as complete for writing. + private volatile Exception _doneWriting; + + /// A waiting reader (e.g. WaitForReadAsync) if there is one. + private ReaderInteractor _waitingReader; + + /// Initialize the channel. + /// Whether to force continuations to be executed asynchronously. + internal SingleConsumerUnboundedChannel(bool runContinuationsAsynchronously) + { + _runContinuationsAsynchronously = runContinuationsAsynchronously; + _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); + + Reader = new UnboundedChannelReader(this); + Writer = new UnboundedChannelWriter(this); + } + + private sealed class UnboundedChannelReader : ChannelReader + { + internal readonly SingleConsumerUnboundedChannel _parent; + internal UnboundedChannelReader(SingleConsumerUnboundedChannel parent) => _parent = parent; + + public override Task Completion => _parent._completion.Task; + + public override bool TryRead(out T item) + { + SingleConsumerUnboundedChannel parent = _parent; + if (parent._items.TryDequeue(out item)) + { + if (parent._doneWriting != null && parent._items.IsEmpty) + { + ChannelUtilities.Complete(parent._completion, parent._doneWriting); + } + return true; + } + return false; + } + + public override Task WaitToReadAsync(CancellationToken cancellationToken) + { + // Outside of the lock, check if there are any items waiting to be read. If there are, we're done. + return !_parent._items.IsEmpty ? + ChannelUtilities.s_trueTask : + WaitToReadAsyncCore(cancellationToken); + + Task WaitToReadAsyncCore(CancellationToken ct) + { + // Now check for cancellation. + if (ct.IsCancellationRequested) + { + return Task.FromCanceled(ct); + } + + SingleConsumerUnboundedChannel parent = _parent; + ReaderInteractor oldWaiter = null, newWaiter; + lock (parent.SyncObj) + { + // Again while holding the lock, check to see if there are any items available. + if (!parent._items.IsEmpty) + { + return ChannelUtilities.s_trueTask; + } + + // There aren't any items; if we're done writing, there never will be more items. + if (parent._doneWriting != null) + { + return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? + Task.FromException(parent._doneWriting) : + ChannelUtilities.s_falseTask; + } + + // Create the new waiter. We're a bit more tolerant of a stray waiting reader + // than we are of a blocked reader, as with usage patterns it's easier to leave one + // behind, so we just cancel any that may have been waiting around. + oldWaiter = parent._waitingReader; + parent._waitingReader = newWaiter = ReaderInteractor.Create(parent._runContinuationsAsynchronously, ct); + } + + oldWaiter?.TrySetCanceled(); + return newWaiter.Task; + } + } + } + + private sealed class UnboundedChannelWriter : ChannelWriter + { + internal readonly SingleConsumerUnboundedChannel _parent; + internal UnboundedChannelWriter(SingleConsumerUnboundedChannel parent) => _parent = parent; + + public override bool TryComplete(Exception error) + { + ReaderInteractor waitingReader = null; + bool completeTask = false; + + SingleConsumerUnboundedChannel parent = _parent; + lock (parent.SyncObj) + { + // If we're already marked as complete, there's nothing more to do. + if (parent._doneWriting != null) + { + return false; + } + + // Mark as complete for writing. + parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; + + // If we have no more items remaining, then the channel needs to be marked as completed + // and readers need to be informed they'll never get another item. All of that needs + // to happen outside of the lock to avoid invoking continuations under the lock. + if (parent._items.IsEmpty) + { + completeTask = true; + + if (parent._waitingReader != null) + { + waitingReader = parent._waitingReader; + parent._waitingReader = null; + } + } + } + + // Complete the channel task if necessary + if (completeTask) + { + ChannelUtilities.Complete(parent._completion, error); + } + + // Complete a waiting reader if necessary. + if (waitingReader != null) + { + if (error != null) + { + waitingReader.Fail(error); + } + else + { + waitingReader.Success(false); + } + } + + // Successfully completed the channel + return true; + } + + public override bool TryWrite(T item) + { + SingleConsumerUnboundedChannel parent = _parent; + while (true) // in case a reader was canceled and we need to try again + { + ReaderInteractor waitingReader = null; + + lock (parent.SyncObj) + { + // If writing is completed, exit out without writing. + if (parent._doneWriting != null) + { + return false; + } + + // Queue the item being written; then if there's a waiting + // reader, store it for notification outside of the lock. + parent._items.Enqueue(item); + + waitingReader = parent._waitingReader; + if (waitingReader == null) + { + return true; + } + parent._waitingReader = null; + } + + // If we get here, we grabbed a waiting reader. + // Notify it that an item was written and exit. + Debug.Assert(waitingReader != null, "Expected a waiting reader"); + waitingReader.Success(true); + return true; + } + } + + public override Task WaitToWriteAsync(CancellationToken cancellationToken) + { + Exception doneWriting = _parent._doneWriting; + return + doneWriting == null ? ChannelUtilities.s_trueTask : + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + doneWriting != ChannelUtilities.s_doneWritingSentinel ? Task.FromException(doneWriting) : + ChannelUtilities.s_falseTask; + } + + public override Task WriteAsync(T item, CancellationToken cancellationToken) => + // Writing always succeeds (unless we've already completed writing or cancellation has been requested), + // so just TryWrite and return a completed task. + TryWrite(item) ? Task.CompletedTask : + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + Task.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting)); + } + + private object SyncObj => _items; + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _items.Count; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _items.GetEnumerator(); + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs new file mode 100644 index 0000000..4391367 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs @@ -0,0 +1,232 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// Provides a buffered channel of unbounded capacity. + [DebuggerDisplay("Items={ItemsCountForDebugger}")] + [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] + internal sealed class UnboundedChannel : Channel, IDebugEnumerable + { + /// Task that indicates the channel has completed. + private readonly TaskCompletionSource _completion; + /// The items in the channel. + private readonly ConcurrentQueue _items = new ConcurrentQueue(); + /// Whether to force continuations to be executed asynchronously from producer writes. + private readonly bool _runContinuationsAsynchronously; + + /// Readers waiting for a notification that data is available. + private ReaderInteractor _waitingReaders; + /// Set to non-null once Complete has been called. + private Exception _doneWriting; + + /// Initialize the channel. + internal UnboundedChannel(bool runContinuationsAsynchronously) + { + _runContinuationsAsynchronously = runContinuationsAsynchronously; + _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); + base.Reader = new UnboundedChannelReader(this); + Writer = new UnboundedChannelWriter(this); + } + + private sealed class UnboundedChannelReader : ChannelReader + { + internal readonly UnboundedChannel _parent; + internal UnboundedChannelReader(UnboundedChannel parent) => _parent = parent; + + public override Task Completion => _parent._completion.Task; + + public override bool TryRead(out T item) + { + UnboundedChannel parent = _parent; + + // Dequeue an item if we can + if (parent._items.TryDequeue(out item)) + { + if (parent._doneWriting != null && parent._items.IsEmpty) + { + // If we've now emptied the items queue and we're not getting any more, complete. + ChannelUtilities.Complete(parent._completion, parent._doneWriting); + } + return true; + } + + item = default; + return false; + } + + public override Task WaitToReadAsync(CancellationToken cancellationToken) + { + // If there are any items, readers can try to get them. + return !_parent._items.IsEmpty ? + ChannelUtilities.s_trueTask : + WaitToReadAsyncCore(cancellationToken); + + Task WaitToReadAsyncCore(CancellationToken ct) + { + UnboundedChannel parent = _parent; + + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Try again to read now that we're synchronized with writers. + if (!parent._items.IsEmpty) + { + return ChannelUtilities.s_trueTask; + } + + // There are no items, so if we're done writing, there's never going to be data available. + if (parent._doneWriting != null) + { + return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? + Task.FromException(parent._doneWriting) : + ChannelUtilities.s_falseTask; + } + + // Queue the waiter + return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingReaders, parent._runContinuationsAsynchronously, ct); + } + } + } + } + + private sealed class UnboundedChannelWriter : ChannelWriter + { + internal readonly UnboundedChannel _parent; + internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent; + + public override bool TryComplete(Exception error) + { + UnboundedChannel parent = _parent; + bool completeTask; + + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // If we've already marked the channel as completed, bail. + if (parent._doneWriting != null) + { + return false; + } + + // Mark that we're done writing. + parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; + completeTask = parent._items.IsEmpty; + } + + // If there are no items in the queue, complete the channel's task, + // as no more data can possibly arrive at this point. We do this outside + // of the lock in case we'll be running synchronous completions, and we + // do it before completing blocked/waiting readers, so that when they + // wake up they'll see the task as being completed. + if (completeTask) + { + ChannelUtilities.Complete(parent._completion, error); + } + + // At this point, _waitingReaders will not be mutated: + // it's only mutated by readers while holding the lock, and only if _doneWriting is null. + // We also know that only one thread (this one) will ever get here, as only that thread + // will be the one to transition from _doneWriting false to true. As such, we can + // freely manipulate _waitingReaders without any concurrency concerns. + ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error); + + // Successfully transitioned to completed. + return true; + } + + public override bool TryWrite(T item) + { + UnboundedChannel parent = _parent; + while (true) + { + ReaderInteractor waitingReaders = null; + lock (parent.SyncObj) + { + // If writing has already been marked as done, fail the write. + parent.AssertInvariants(); + if (parent._doneWriting != null) + { + return false; + } + + // Add the data to the queue, and let any waiting readers know that they should try to read it. + // We can only complete such waiters here under the lock if they run continuations asynchronously + // (otherwise the synchronous continuations could be invoked under the lock). If we don't complete + // them here, we need to do so outside of the lock. + parent._items.Enqueue(item); + waitingReaders = parent._waitingReaders; + if (waitingReaders == null) + { + return true; + } + parent._waitingReaders = null; + } + + // Wake up all of the waiters. Since we've released the lock, it's possible + // we could cause some spurious wake-ups here, if we tell a waiter there's + // something available but all data has already been removed. It's a benign + // race condition, though, as consumers already need to account for such things. + waitingReaders.Success(item: true); + return true; + } + } + + public override Task WaitToWriteAsync(CancellationToken cancellationToken) + { + Exception doneWriting = _parent._doneWriting; + return + doneWriting == null ? ChannelUtilities.s_trueTask : // unbounded writing can always be done if we haven't completed + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + doneWriting != ChannelUtilities.s_doneWritingSentinel ? Task.FromException(doneWriting) : + ChannelUtilities.s_falseTask; + } + + public override Task WriteAsync(T item, CancellationToken cancellationToken) => + TryWrite(item) ? ChannelUtilities.s_trueTask : + cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : + Task.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting)); + } + + /// Gets the object used to synchronize access to all state on this instance. + private object SyncObj => _items; + + [Conditional("DEBUG")] + private void AssertInvariants() + { + Debug.Assert(SyncObj != null, "The sync obj must not be null."); + Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); + + if (!_items.IsEmpty) + { + if (_runContinuationsAsynchronously) + { + Debug.Assert(_waitingReaders == null, "There's data available, so there shouldn't be any waiting readers."); + } + Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed."); + } + if (_waitingReaders != null && _runContinuationsAsynchronously) + { + Debug.Assert(_items.IsEmpty, "There are blocked/waiting readers, so there shouldn't be any data available."); + } + if (_completion.Task.IsCompleted) + { + Debug.Assert(_doneWriting != null, "We're completed, so we must be done writing."); + } + } + + /// Gets the number of items in the channel. This should only be used by the debugger. + private int ItemsCountForDebugger => _items.Count; + + /// Gets an enumerator the debugger can use to show the contents of the channel. + IEnumerator IDebugEnumerable.GetEnumerator() => _items.GetEnumerator(); + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnbufferedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnbufferedChannel.cs new file mode 100644 index 0000000..ba94e33 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnbufferedChannel.cs @@ -0,0 +1,217 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + /// Provides an unbuffered channel, such that a reader and a writer must rendezvous to succeed. + [DebuggerDisplay("Blocked Writers: {BlockedWritersCountForDebugger}, Waiting Readers: {WaitingReadersForDebugger}")] + [DebuggerTypeProxy(typeof(UnbufferedChannel<>.DebugView))] + internal sealed class UnbufferedChannel : Channel + { + /// Task that represents the completion of the channel. + private readonly TaskCompletionSource _completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + /// A queue of writers blocked waiting to be matched with a reader. + private readonly Dequeue> _blockedWriters = new Dequeue>(); + + /// Task signaled when any WaitToReadAsync waiters should be woken up. + private ReaderInteractor _waitingReaders; + + private sealed class UnbufferedChannelReader : ChannelReader + { + internal readonly UnbufferedChannel _parent; + internal UnbufferedChannelReader(UnbufferedChannel parent) => _parent = parent; + + public override Task Completion => _parent._completion.Task; + + public override bool TryRead(out T item) + { + UnbufferedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Try to find a writer to pair with + while (!parent._blockedWriters.IsEmpty) + { + WriterInteractor w = parent._blockedWriters.DequeueHead(); + if (w.Success(default(VoidResult))) + { + item = w.Item; + return true; + } + } + } + + // None found + item = default; + return false; + } + + public override Task WaitToReadAsync(CancellationToken cancellationToken) + { + UnbufferedChannel parent = _parent; + lock (parent.SyncObj) + { + // If we're done writing, fail. + if (parent._completion.Task.IsCompleted) + { + return parent._completion.Task.IsFaulted ? + Task.FromException(parent._completion.Task.Exception.InnerException) : + ChannelUtilities.s_falseTask; + } + + // If there's a blocked writer, we can read. + if (!parent._blockedWriters.IsEmpty) + { + return ChannelUtilities.s_trueTask; + } + + // Otherwise, queue the waiter. + return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingReaders, true, cancellationToken); + } + } + } + + private sealed class UnbufferedChannelWriter : ChannelWriter + { + internal readonly UnbufferedChannel _parent; + internal UnbufferedChannelWriter(UnbufferedChannel parent) => _parent = parent; + + public override bool TryComplete(Exception error) + { + UnbufferedChannel parent = _parent; + lock (parent.SyncObj) + { + parent.AssertInvariants(); + + // Mark the channel as being done. Since there's no buffered data, we can complete immediately. + if (parent._completion.Task.IsCompleted) + { + return false; + } + ChannelUtilities.Complete(parent._completion, error); + + // Fail any blocked writers, as there will be no readers to pair them with. + if (parent._blockedWriters.Count > 0) + { + ChannelUtilities.FailInteractors, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error)); + } + + // Let any waiting readers know there won't be any more data. + ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error); + } + + return true; + } + + public override bool TryWrite(T item) + { + // TryWrite on an UnbufferedChannel can never succeed, as there aren't + // any readers that are able to wait-and-read atomically + return false; + } + + public override Task WaitToWriteAsync(CancellationToken cancellationToken) + { + UnbufferedChannel parent = _parent; + + // If we're done writing, fail. + if (parent._completion.Task.IsCompleted) + { + return parent._completion.Task.IsFaulted ? + Task.FromException(parent._completion.Task.Exception.InnerException) : + ChannelUtilities.s_falseTask; + } + + // Otherwise, just return a task suggesting a write be attempted. + // Since there's no "ReadAsync", there's nothing to wait for. + return ChannelUtilities.s_trueTask; + } + + public override Task WriteAsync(T item, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + UnbufferedChannel parent = _parent; + lock (parent.SyncObj) + { + // Fail if we've already completed. + if (parent._completion.Task.IsCompleted) + { + return + parent._completion.Task.IsCanceled ? Task.FromCanceled(new CancellationToken(true)) : + Task.FromException( + parent._completion.Task.IsFaulted ? + ChannelUtilities.CreateInvalidCompletionException(parent._completion.Task.Exception.InnerException) : + ChannelUtilities.CreateInvalidCompletionException()); + } + + // Queue the writer. + var w = WriterInteractor.Create(true, cancellationToken, item); + parent._blockedWriters.EnqueueTail(w); + + // And let any waiting readers know it's their lucky day. + ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: true); + + return w.Task; + } + } + } + + /// Initialize the channel. + internal UnbufferedChannel() + { + base.Reader = new UnbufferedChannelReader(this); + Writer = new UnbufferedChannelWriter(this); + } + + /// Gets an object used to synchronize all state on the instance. + private object SyncObj => _completion; + + [Conditional("DEBUG")] + private void AssertInvariants() + { + Debug.Assert(SyncObj != null, "The sync obj must not be null."); + Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); + + if (_completion.Task.IsCompleted) + { + Debug.Assert(_blockedWriters.IsEmpty, "No writers can be blocked after we've completed."); + } + } + + /// Gets whether there are any waiting readers. This should only be used by the debugger. + private bool WaitingReadersForDebugger => _waitingReaders != null; + /// Gets the number of blocked writers. This should only be used by the debugger. + private int BlockedWritersCountForDebugger => _blockedWriters.Count; + + private sealed class DebugView + { + private readonly UnbufferedChannel _channel; + + public DebugView(UnbufferedChannel channel) => _channel = channel; + + public bool WaitingReaders => _channel._waitingReaders != null; + public T[] BlockedWriters + { + get + { + var items = new List(); + foreach (WriterInteractor blockedWriter in _channel._blockedWriters) + { + items.Add(blockedWriter.Item); + } + return items.ToArray(); + } + } + } + } +} diff --git a/src/libraries/System.Threading.Channels/src/System/VoidResult.cs b/src/libraries/System.Threading.Channels/src/System/VoidResult.cs new file mode 100644 index 0000000..43e8f44 --- /dev/null +++ b/src/libraries/System.Threading.Channels/src/System/VoidResult.cs @@ -0,0 +1,9 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System +{ + /// An empty struct, used to represent void in generic types. + internal struct VoidResult { } +} diff --git a/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs b/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs new file mode 100644 index 0000000..b19bc86 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/BoundedChannelTests.cs @@ -0,0 +1,402 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public class BoundedChannelTests : ChannelTestBase + { + protected override Channel CreateChannel() => Channel.CreateBounded(1); + protected override Channel CreateFullChannel() + { + var c = Channel.CreateBounded(1); + c.Writer.WriteAsync(42).Wait(); + return c; + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void TryWrite_TryRead_Many_Wait(int bufferedCapacity) + { + var c = Channel.CreateBounded(bufferedCapacity); + + for (int i = 0; i < bufferedCapacity; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + Assert.False(c.Writer.TryWrite(bufferedCapacity)); + + int result; + for (int i = 0; i < bufferedCapacity; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void TryWrite_TryRead_Many_DropOldest(int bufferedCapacity) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(bufferedCapacity) { FullMode = BoundedChannelFullMode.DropOldest }); + + for (int i = 0; i < bufferedCapacity * 2; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + + int result; + for (int i = bufferedCapacity; i < bufferedCapacity * 2; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void WriteAsync_TryRead_Many_DropOldest(int bufferedCapacity) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(bufferedCapacity) { FullMode = BoundedChannelFullMode.DropOldest }); + + for (int i = 0; i < bufferedCapacity * 2; i++) + { + AssertSynchronousSuccess(c.Writer.WriteAsync(i)); + } + + int result; + for (int i = bufferedCapacity; i < bufferedCapacity * 2; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void TryWrite_TryRead_Many_DropNewest(int bufferedCapacity) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(bufferedCapacity) { FullMode = BoundedChannelFullMode.DropNewest }); + + for (int i = 0; i < bufferedCapacity * 2; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + + int result; + for (int i = 0; i < bufferedCapacity - 1; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(bufferedCapacity * 2 - 1, result); + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void WriteAsync_TryRead_Many_DropNewest(int bufferedCapacity) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(bufferedCapacity) { FullMode = BoundedChannelFullMode.DropNewest }); + + for (int i = 0; i < bufferedCapacity * 2; i++) + { + AssertSynchronousSuccess(c.Writer.WriteAsync(i)); + } + + int result; + for (int i = 0; i < bufferedCapacity - 1; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(bufferedCapacity * 2 - 1, result); + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Fact] + public async Task TryWrite_DropNewest_WrappedAroundInternalQueue() + { + var c = Channel.CreateBounded(new BoundedChannelOptions(3) { FullMode = BoundedChannelFullMode.DropNewest }); + + // Move head of dequeue beyond the beginning + Assert.True(c.Writer.TryWrite(1)); + Assert.True(c.Reader.TryRead(out int item)); + Assert.Equal(1, item); + + // Add items to fill the capacity and put the tail at 0 + Assert.True(c.Writer.TryWrite(2)); + Assert.True(c.Writer.TryWrite(3)); + Assert.True(c.Writer.TryWrite(4)); + + // Add an item to overwrite the newest + Assert.True(c.Writer.TryWrite(5)); + + // Verify current contents + Assert.Equal(2, await c.Reader.ReadAsync()); + Assert.Equal(3, await c.Reader.ReadAsync()); + Assert.Equal(5, await c.Reader.ReadAsync()); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void TryWrite_TryRead_Many_Ignore(int bufferedCapacity) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(bufferedCapacity) { FullMode = BoundedChannelFullMode.DropWrite }); + + for (int i = 0; i < bufferedCapacity * 2; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + + int result; + for (int i = 0; i < bufferedCapacity; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void WriteAsync_TryRead_Many_Ignore(int bufferedCapacity) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(bufferedCapacity) { FullMode = BoundedChannelFullMode.DropWrite }); + + for (int i = 0; i < bufferedCapacity * 2; i++) + { + AssertSynchronousSuccess(c.Writer.WriteAsync(i)); + } + + int result; + for (int i = 0; i < bufferedCapacity; i++) + { + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + + Assert.False(c.Reader.TryRead(out result)); + Assert.Equal(0, result); + } + + [Fact] + public async Task CancelPendingWrite_Reading_DataTransferredFromCorrectWriter() + { + var c = Channel.CreateBounded(1); + Assert.Equal(TaskStatus.RanToCompletion, c.Writer.WriteAsync(42).Status); + + var cts = new CancellationTokenSource(); + + Task write1 = c.Writer.WriteAsync(43, cts.Token); + Assert.Equal(TaskStatus.WaitingForActivation, write1.Status); + + cts.Cancel(); + + Task write2 = c.Writer.WriteAsync(44); + + Assert.Equal(42, await c.Reader.ReadAsync()); + Assert.Equal(44, await c.Reader.ReadAsync()); + + await AssertCanceled(write1, cts.Token); + await write2; + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void TryWrite_TryRead_OneAtATime(int bufferedCapacity) + { + var c = Channel.CreateBounded(bufferedCapacity); + + const int NumItems = 100000; + for (int i = 0; i < NumItems; i++) + { + Assert.True(c.Writer.TryWrite(i)); + Assert.True(c.Reader.TryRead(out int result)); + Assert.Equal(i, result); + } + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void SingleProducerConsumer_ConcurrentReadWrite_WithBufferedCapacity_Success(int bufferedCapacity) + { + var c = Channel.CreateBounded(bufferedCapacity); + + const int NumItems = 10000; + Task.WaitAll( + Task.Run(async () => + { + for (int i = 0; i < NumItems; i++) + { + await c.Writer.WriteAsync(i); + } + }), + Task.Run(async () => + { + for (int i = 0; i < NumItems; i++) + { + Assert.Equal(i, await c.Reader.ReadAsync()); + } + })); + } + + [Theory] + [InlineData(1)] + [InlineData(10)] + [InlineData(10000)] + public void ManyProducerConsumer_ConcurrentReadWrite_WithBufferedCapacity_Success(int bufferedCapacity) + { + var c = Channel.CreateBounded(bufferedCapacity); + + const int NumWriters = 10; + const int NumReaders = 10; + const int NumItems = 10000; + + long readTotal = 0; + int remainingWriters = NumWriters; + int remainingItems = NumItems; + + Task[] tasks = new Task[NumWriters + NumReaders]; + + for (int i = 0; i < NumReaders; i++) + { + tasks[i] = Task.Run(async () => + { + try + { + while (true) + { + Interlocked.Add(ref readTotal, await c.Reader.ReadAsync()); + } + } + catch (ChannelClosedException) { } + }); + } + + for (int i = 0; i < NumWriters; i++) + { + tasks[NumReaders + i] = Task.Run(async () => + { + while (true) + { + int value = Interlocked.Decrement(ref remainingItems); + if (value < 0) + { + break; + } + await c.Writer.WriteAsync(value + 1); + } + if (Interlocked.Decrement(ref remainingWriters) == 0) + { + c.Writer.Complete(); + } + }); + } + + Task.WaitAll(tasks); + Assert.Equal((NumItems * (NumItems + 1L)) / 2, readTotal); + } + + [Fact] + public async Task WaitToWriteAsync_AfterFullThenRead_ReturnsTrue() + { + var c = Channel.CreateBounded(1); + Assert.True(c.Writer.TryWrite(1)); + + Task write1 = c.Writer.WaitToWriteAsync(); + Assert.False(write1.IsCompleted); + + Task write2 = c.Writer.WaitToWriteAsync(); + Assert.False(write2.IsCompleted); + + Assert.Equal(1, await c.Reader.ReadAsync()); + + Assert.True(await write1); + Assert.True(await write2); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void AllowSynchronousContinuations_WaitToReadAsync_ContinuationsInvokedAccordingToSetting(bool allowSynchronousContinuations) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(1) { AllowSynchronousContinuations = allowSynchronousContinuations }); + + int expectedId = Environment.CurrentManagedThreadId; + Task r = c.Reader.WaitToReadAsync().ContinueWith(_ => + { + Assert.Equal(allowSynchronousContinuations, expectedId == Environment.CurrentManagedThreadId); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + + Assert.Equal(TaskStatus.RanToCompletion, c.Writer.WriteAsync(42).Status); + ((IAsyncResult)r).AsyncWaitHandle.WaitOne(); // avoid inlining the continuation + r.GetAwaiter().GetResult(); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void AllowSynchronousContinuations_CompletionTask_ContinuationsInvokedAccordingToSetting(bool allowSynchronousContinuations) + { + var c = Channel.CreateBounded(new BoundedChannelOptions(1) { AllowSynchronousContinuations = allowSynchronousContinuations }); + + int expectedId = Environment.CurrentManagedThreadId; + Task r = c.Reader.Completion.ContinueWith(_ => + { + Assert.Equal(allowSynchronousContinuations, expectedId == Environment.CurrentManagedThreadId); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + + Assert.True(c.Writer.TryComplete()); + ((IAsyncResult)r).AsyncWaitHandle.WaitOne(); // avoid inlining the continuation + r.GetAwaiter().GetResult(); + } + + [Fact] + public void TryWrite_NoBlockedReaders_WaitingReader_WaiterNotifified() + { + Channel c = CreateChannel(); + + Task r = c.Reader.WaitToReadAsync(); + Assert.True(c.Writer.TryWrite(42)); + AssertSynchronousTrue(r); + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/ChannelClosedExceptionTests.cs b/src/libraries/System.Threading.Channels/tests/ChannelClosedExceptionTests.cs new file mode 100644 index 0000000..38a1e9b --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/ChannelClosedExceptionTests.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public class ChannelClosedExceptionTests + { + [Fact] + public void Ctors() + { + var e = new ChannelClosedException(); + Assert.NotEmpty(e.Message); + Assert.Null(e.InnerException); + + e = new ChannelClosedException("hello"); + Assert.Equal("hello", e.Message); + Assert.Null(e.InnerException); + + var inner = new FormatException(); + e = new ChannelClosedException("hello", inner); + Assert.Equal("hello", e.Message); + Assert.Same(inner, e.InnerException); + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs b/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs new file mode 100644 index 0000000..91b2d71 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs @@ -0,0 +1,457 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Diagnostics; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public abstract class ChannelTestBase : TestBase + { + protected abstract Channel CreateChannel(); + protected abstract Channel CreateFullChannel(); + + protected virtual bool RequiresSingleReader => false; + protected virtual bool RequiresSingleWriter => false; + + [Fact] + public void ValidateDebuggerAttributes() + { + Channel c = CreateChannel(); + for (int i = 1; i <= 10; i++) + { + c.Writer.WriteAsync(i); + } + DebuggerAttributes.ValidateDebuggerDisplayReferences(c); + DebuggerAttributes.ValidateDebuggerTypeProxyProperties(c); + } + + [Fact] + public void Cast_MatchesInOut() + { + Channel c = CreateChannel(); + ChannelReader rc = c; + ChannelWriter wc = c; + Assert.Same(rc, c.Reader); + Assert.Same(wc, c.Writer); + } + + [Fact] + public void Completion_Idempotent() + { + Channel c = CreateChannel(); + + Task completion = c.Reader.Completion; + Assert.Equal(TaskStatus.WaitingForActivation, completion.Status); + + Assert.Same(completion, c.Reader.Completion); + c.Writer.Complete(); + Assert.Same(completion, c.Reader.Completion); + + Assert.Equal(TaskStatus.RanToCompletion, completion.Status); + } + + [Fact] + public async Task Complete_AfterEmpty_NoWaiters_TriggersCompletion() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + await c.Reader.Completion; + } + + [Fact] + public async Task Complete_AfterEmpty_WaitingReader_TriggersCompletion() + { + Channel c = CreateChannel(); + Task r = c.Reader.ReadAsync().AsTask(); + c.Writer.Complete(); + await c.Reader.Completion; + await Assert.ThrowsAnyAsync(() => r); + } + + [Fact] + public async Task Complete_BeforeEmpty_WaitingReaders_TriggersCompletion() + { + Channel c = CreateChannel(); + Task read = c.Reader.ReadAsync().AsTask(); + c.Writer.Complete(); + await c.Reader.Completion; + await Assert.ThrowsAnyAsync(() => read); + } + + [Fact] + public void Complete_Twice_ThrowsInvalidOperationException() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + Assert.ThrowsAny(() => c.Writer.Complete()); + } + + [Fact] + public void TryComplete_Twice_ReturnsTrueThenFalse() + { + Channel c = CreateChannel(); + Assert.True(c.Writer.TryComplete()); + Assert.False(c.Writer.TryComplete()); + Assert.False(c.Writer.TryComplete()); + } + + [Fact] + public async Task TryComplete_ErrorsPropage() + { + Channel c; + + // Success + c = CreateChannel(); + Assert.True(c.Writer.TryComplete()); + await c.Reader.Completion; + + // Error + c = CreateChannel(); + Assert.True(c.Writer.TryComplete(new FormatException())); + await Assert.ThrowsAsync(() => c.Reader.Completion); + + // Canceled + c = CreateChannel(); + var cts = new CancellationTokenSource(); + cts.Cancel(); + Assert.True(c.Writer.TryComplete(new OperationCanceledException(cts.Token))); + await AssertCanceled(c.Reader.Completion, cts.Token); + } + + [Fact] + public void SingleProducerConsumer_ConcurrentReadWrite_Success() + { + Channel c = CreateChannel(); + + const int NumItems = 100000; + Task.WaitAll( + Task.Run(async () => + { + for (int i = 0; i < NumItems; i++) + { + await c.Writer.WriteAsync(i); + } + }), + Task.Run(async () => + { + for (int i = 0; i < NumItems; i++) + { + Assert.Equal(i, await c.Reader.ReadAsync()); + } + })); + } + + [Fact] + public void SingleProducerConsumer_PingPong_Success() + { + Channel c1 = CreateChannel(); + Channel c2 = CreateChannel(); + + const int NumItems = 100000; + Task.WaitAll( + Task.Run(async () => + { + for (int i = 0; i < NumItems; i++) + { + Assert.Equal(i, await c1.Reader.ReadAsync()); + await c2.Writer.WriteAsync(i); + } + }), + Task.Run(async () => + { + for (int i = 0; i < NumItems; i++) + { + await c1.Writer.WriteAsync(i); + Assert.Equal(i, await c2.Reader.ReadAsync()); + } + })); + } + + [Theory] + [InlineData(1, 1)] + [InlineData(1, 10)] + [InlineData(10, 1)] + [InlineData(10, 10)] + public void ManyProducerConsumer_ConcurrentReadWrite_Success(int numReaders, int numWriters) + { + if (RequiresSingleReader && numReaders > 1) + { + return; + } + + if (RequiresSingleWriter && numWriters > 1) + { + return; + } + + Channel c = CreateChannel(); + + const int NumItems = 10000; + + long readTotal = 0; + int remainingWriters = numWriters; + int remainingItems = NumItems; + + Task[] tasks = new Task[numWriters + numReaders]; + + for (int i = 0; i < numReaders; i++) + { + tasks[i] = Task.Run(async () => + { + try + { + while (await c.Reader.WaitToReadAsync()) + { + if (c.Reader.TryRead(out int value)) + { + Interlocked.Add(ref readTotal, value); + } + } + } + catch (ChannelClosedException) { } + }); + } + + for (int i = 0; i < numWriters; i++) + { + tasks[numReaders + i] = Task.Run(async () => + { + while (true) + { + int value = Interlocked.Decrement(ref remainingItems); + if (value < 0) + { + break; + } + await c.Writer.WriteAsync(value + 1); + } + if (Interlocked.Decrement(ref remainingWriters) == 0) + { + c.Writer.Complete(); + } + }); + } + + Task.WaitAll(tasks); + Assert.Equal((NumItems * (NumItems + 1L)) / 2, readTotal); + } + + [Fact] + public void WaitToReadAsync_DataAvailableBefore_CompletesSynchronously() + { + Channel c = CreateChannel(); + Task write = c.Writer.WriteAsync(42); + Task read = c.Reader.WaitToReadAsync(); + Assert.Equal(TaskStatus.RanToCompletion, read.Status); + } + + [Fact] + public void WaitToReadAsync_DataAvailableAfter_CompletesAsynchronously() + { + Channel c = CreateChannel(); + Task read = c.Reader.WaitToReadAsync(); + Assert.False(read.IsCompleted); + Task write = c.Writer.WriteAsync(42); + Assert.True(read.Result); + } + + [Fact] + public void WaitToReadAsync_AfterComplete_SynchronouslyCompletes() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + Task read = c.Reader.WaitToReadAsync(); + Assert.Equal(TaskStatus.RanToCompletion, read.Status); + Assert.False(read.Result); + } + + [Fact] + public void WaitToReadAsync_BeforeComplete_AsynchronouslyCompletes() + { + Channel c = CreateChannel(); + Task read = c.Reader.WaitToReadAsync(); + Assert.False(read.IsCompleted); + c.Writer.Complete(); + Assert.False(read.Result); + } + + [Fact] + public void WaitToWriteAsync_AfterComplete_SynchronouslyCompletes() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + Task write = c.Writer.WaitToWriteAsync(); + Assert.Equal(TaskStatus.RanToCompletion, write.Status); + Assert.False(write.Result); + } + + [Fact] + public void TryRead_DataAvailable_Success() + { + Channel c = CreateChannel(); + Task write = c.Writer.WriteAsync(42); + Assert.True(c.Reader.TryRead(out int result)); + Assert.Equal(42, result); + } + + [Fact] + public void TryRead_AfterComplete_ReturnsFalse() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + Assert.False(c.Reader.TryRead(out int result)); + } + + [Fact] + public void TryWrite_AfterComplete_ReturnsFalse() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + Assert.False(c.Writer.TryWrite(42)); + } + + [Fact] + public async Task WriteAsync_AfterComplete_ThrowsException() + { + Channel c = CreateChannel(); + c.Writer.Complete(); + await Assert.ThrowsAnyAsync(() => c.Writer.WriteAsync(42)); + } + + [Fact] + public async Task Complete_WithException_PropagatesToCompletion() + { + Channel c = CreateChannel(); + var exc = new FormatException(); + c.Writer.Complete(exc); + Assert.Same(exc, await Assert.ThrowsAsync(() => c.Reader.Completion)); + } + + [Fact] + public async Task Complete_WithCancellationException_PropagatesToCompletion() + { + Channel c = CreateChannel(); + var cts = new CancellationTokenSource(); + cts.Cancel(); + + Exception exc = null; + try { cts.Token.ThrowIfCancellationRequested(); } + catch (Exception e) { exc = e; } + + c.Writer.Complete(exc); + await AssertCanceled(c.Reader.Completion, cts.Token); + } + + [Fact] + public async Task Complete_WithException_PropagatesToExistingWriter() + { + Channel c = CreateFullChannel(); + if (c != null) + { + Task write = c.Writer.WriteAsync(42); + var exc = new FormatException(); + c.Writer.Complete(exc); + Assert.Same(exc, (await Assert.ThrowsAsync(() => write)).InnerException); + } + } + + [Fact] + public async Task Complete_WithException_PropagatesToNewWriter() + { + Channel c = CreateChannel(); + var exc = new FormatException(); + c.Writer.Complete(exc); + Task write = c.Writer.WriteAsync(42); + Assert.Same(exc, (await Assert.ThrowsAsync(() => write)).InnerException); + } + + [Fact] + public async Task Complete_WithException_PropagatesToExistingWaitingReader() + { + Channel c = CreateChannel(); + Task read = c.Reader.WaitToReadAsync(); + var exc = new FormatException(); + c.Writer.Complete(exc); + await Assert.ThrowsAsync(() => read); + } + + [Fact] + public async Task Complete_WithException_PropagatesToNewWaitingReader() + { + Channel c = CreateChannel(); + var exc = new FormatException(); + c.Writer.Complete(exc); + Task read = c.Reader.WaitToReadAsync(); + await Assert.ThrowsAsync(() => read); + } + + [Fact] + public async Task Complete_WithException_PropagatesToNewWaitingWriter() + { + Channel c = CreateChannel(); + var exc = new FormatException(); + c.Writer.Complete(exc); + Task write = c.Writer.WaitToWriteAsync(); + await Assert.ThrowsAsync(() => write); + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + public void ManyWriteAsync_ThenManyTryRead_Success(int readMode) + { + if (RequiresSingleReader || RequiresSingleWriter) + { + return; + } + + Channel c = CreateChannel(); + + const int NumItems = 2000; + + Task[] writers = new Task[NumItems]; + for (int i = 0; i < writers.Length; i++) + { + writers[i] = c.Writer.WriteAsync(i); + } + + Task[] readers = new Task[NumItems]; + for (int i = 0; i < readers.Length; i++) + { + int result; + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + } + + Assert.All(writers, w => Assert.True(w.IsCompleted)); + } + + [Fact] + public void Precancellation_Writing_ReturnsSuccessImmediately() + { + Channel c = CreateChannel(); + var cts = new CancellationTokenSource(); + cts.Cancel(); + + Task writeTask = c.Writer.WriteAsync(42, cts.Token); + Assert.True(writeTask.Status == TaskStatus.Canceled || writeTask.Status == TaskStatus.RanToCompletion, $"Status == {writeTask.Status}"); + + Task waitTask = c.Writer.WaitToWriteAsync(cts.Token); + Assert.True(writeTask.Status == TaskStatus.Canceled || writeTask.Status == TaskStatus.RanToCompletion, $"Status == {writeTask.Status}"); + if (waitTask.Status == TaskStatus.RanToCompletion) + { + Assert.True(waitTask.Result); + } + } + + [Fact] + public void Write_WaitToReadAsync_CompletesSynchronously() + { + Channel c = CreateChannel(); + c.Writer.WriteAsync(42); + AssertSynchronousTrue(c.Reader.WaitToReadAsync()); + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/ChannelTests.cs b/src/libraries/System.Threading.Channels/tests/ChannelTests.cs new file mode 100644 index 0000000..ffaf46e --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/ChannelTests.cs @@ -0,0 +1,145 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public class ChannelTests + { + [Fact] + public void ChannelOptimizations_Properties_Roundtrip() + { + var co = new UnboundedChannelOptions(); + + Assert.False(co.SingleReader); + Assert.False(co.SingleWriter); + + co.SingleReader = true; + Assert.True(co.SingleReader); + Assert.False(co.SingleWriter); + co.SingleReader = false; + Assert.False(co.SingleReader); + + co.SingleWriter = true; + Assert.False(co.SingleReader); + Assert.True(co.SingleWriter); + co.SingleWriter = false; + Assert.False(co.SingleWriter); + + co.SingleReader = true; + co.SingleWriter = true; + Assert.True(co.SingleReader); + Assert.True(co.SingleWriter); + + Assert.False(co.AllowSynchronousContinuations); + co.AllowSynchronousContinuations = true; + Assert.True(co.AllowSynchronousContinuations); + co.AllowSynchronousContinuations = false; + Assert.False(co.AllowSynchronousContinuations); + } + + [Theory] + [InlineData(0)] + [InlineData(-2)] + public void CreateBounded_InvalidBufferSizes_ThrowArgumentExceptions(int capacity) + { + Assert.Throws("capacity", () => Channel.CreateBounded(capacity)); + Assert.Throws("capacity", () => new BoundedChannelOptions(capacity)); + } + + [Theory] + [InlineData((BoundedChannelFullMode)(-1))] + [InlineData((BoundedChannelFullMode)(4))] + public void BoundedChannelOptions_InvalidModes_ThrowArgumentExceptions(BoundedChannelFullMode mode) => + Assert.Throws("value", () => new BoundedChannelOptions(1) { FullMode = mode }); + + [Theory] + [InlineData(1)] + public void CreateBounded_ValidBufferSizes_Success(int bufferedCapacity) => + Assert.NotNull(Channel.CreateBounded(bufferedCapacity)); + + [Fact] + public async Task DefaultWriteAsync_UsesWaitToWriteAsyncAndTryWrite() + { + var c = new TestChannelWriter(10); + Assert.False(c.TryComplete()); + Assert.Equal(TaskStatus.Canceled, c.WriteAsync(42, new CancellationToken(true)).Status); + + int count = 0; + try + { + while (true) + { + await c.WriteAsync(count++); + } + } + catch (ChannelClosedException) { } + Assert.Equal(11, count); + } + + private sealed class TestChannelWriter : ChannelWriter + { + private readonly Random _rand = new Random(42); + private readonly int _max; + private int _count; + + public TestChannelWriter(int max) => _max = max; + + public override bool TryWrite(T item) => _rand.Next(0, 2) == 0 && _count++ < _max; // succeed if we're under our limit, and add random failures + + public override Task WaitToWriteAsync(CancellationToken cancellationToken) => + _count >= _max ? Task.FromResult(false) : + _rand.Next(0, 2) == 0 ? Task.Delay(1).ContinueWith(_ => true) : // randomly introduce delays + Task.FromResult(true); + } + + private sealed class TestChannelReader : ChannelReader + { + private Random _rand = new Random(42); + private IEnumerator _enumerator; + private int _count; + private bool _closed; + + public TestChannelReader(IEnumerable enumerable) => _enumerator = enumerable.GetEnumerator(); + + public override bool TryRead(out T item) + { + // Randomly fail to read + if (_rand.Next(0, 2) == 0) + { + item = default; + return false; + } + + // If the enumerable is closed, fail the read. + if (!_enumerator.MoveNext()) + { + _enumerator.Dispose(); + _closed = true; + item = default; + return false; + } + + // Otherwise return the next item. + _count++; + item = _enumerator.Current; + return true; + } + + public override Task WaitToReadAsync(CancellationToken cancellationToken) => + _closed ? Task.FromResult(false) : + _rand.Next(0, 2) == 0 ? Task.Delay(1).ContinueWith(_ => true) : // randomly introduce delays + Task.FromResult(true); + } + + private sealed class CanReadFalseStream : MemoryStream + { + public override bool CanRead => false; + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/Configurations.props b/src/libraries/System.Threading.Channels/tests/Configurations.props new file mode 100644 index 0000000..78953df --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/Configurations.props @@ -0,0 +1,8 @@ + + + + + netstandard; + + + diff --git a/src/libraries/System.Threading.Channels/tests/DebuggerAttributes.cs b/src/libraries/System.Threading.Channels/tests/DebuggerAttributes.cs new file mode 100644 index 0000000..02e1292 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/DebuggerAttributes.cs @@ -0,0 +1,145 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Linq; +using System.Reflection; + +namespace System.Diagnostics +{ + internal static class DebuggerAttributes + { + internal static object GetFieldValue(object obj, string fieldName) => GetField(obj, fieldName).GetValue(obj); + + internal static void ValidateDebuggerTypeProxyProperties(object obj) + { + // Get the DebuggerTypeProxyAttibute for obj + CustomAttributeData[] attrs = + obj.GetType().GetTypeInfo().CustomAttributes + .Where(a => a.AttributeType == typeof(DebuggerTypeProxyAttribute)) + .ToArray(); + if (attrs.Length != 1) + { + throw new InvalidOperationException( + string.Format("Expected one DebuggerTypeProxyAttribute on {0}.", obj)); + } + CustomAttributeData cad = attrs[0]; + + // Get the proxy type. As written, this only works if the proxy and the target type + // have the same generic parameters, e.g. Dictionary and Proxy. + // It will not work with, for example, Dictionary.Keys and Proxy, + // as the former has two generic parameters and the latter only one. + Type proxyType = cad.ConstructorArguments[0].ArgumentType == typeof(Type) ? + (Type)cad.ConstructorArguments[0].Value : + Type.GetType((string)cad.ConstructorArguments[0].Value); + Type[] genericArguments = obj.GetType().GenericTypeArguments; + if (genericArguments.Length > 0) + { + proxyType = proxyType.MakeGenericType(genericArguments); + } + + // Create an instance of the proxy type, and make sure we can access all of the instance properties + // on the type without exception + object proxyInstance = Activator.CreateInstance(proxyType, obj); + foreach (PropertyInfo pi in proxyInstance.GetType().GetTypeInfo().DeclaredProperties) + { + pi.GetValue(proxyInstance, null); + } + } + + internal static void ValidateDebuggerDisplayReferences(object obj) + { + // Get the DebuggerDisplayAttribute for obj + CustomAttributeData[] attrs = + obj.GetType().GetTypeInfo().CustomAttributes + .Where(a => a.AttributeType == typeof(DebuggerDisplayAttribute)) + .ToArray(); + if (attrs.Length != 1) + { + throw new InvalidOperationException( + string.Format("Expected one DebuggerDisplayAttribute on {0}.", obj)); + } + CustomAttributeData cad = attrs[0]; + + // Get the text of the DebuggerDisplayAttribute + string attrText = (string)cad.ConstructorArguments[0].Value; + + // Parse the text for all expressions + var references = new List(); + int pos = 0; + while (true) + { + int openBrace = attrText.IndexOf('{', pos); + if (openBrace < pos) + { + break; + } + + int closeBrace = attrText.IndexOf('}', openBrace); + if (closeBrace < openBrace) + { + break; + } + + string reference = attrText.Substring(openBrace + 1, closeBrace - openBrace - 1).Replace(",nq", ""); + pos = closeBrace + 1; + + references.Add(reference); + } + if (references.Count == 0) + { + throw new InvalidOperationException( + string.Format("The DebuggerDisplayAttribute for {0} doesn't reference any expressions.", obj)); + } + + // Make sure that each referenced expression is a simple field or property name, and that we can + // invoke the property's get accessor or read from the field. + foreach (string reference in references) + { + PropertyInfo pi = GetProperty(obj, reference); + if (pi != null) + { + object ignored = pi.GetValue(obj, null); + continue; + } + + FieldInfo fi = GetField(obj, reference); + if (fi != null) + { + object ignored = fi.GetValue(obj); + continue; + } + + throw new InvalidOperationException( + string.Format("The DebuggerDisplayAttribute for {0} contains the expression \"{1}\".", obj, reference)); + } + } + + private static FieldInfo GetField(object obj, string fieldName) + { + for (Type t = obj.GetType(); t != null; t = t.GetTypeInfo().BaseType) + { + FieldInfo fi = t.GetTypeInfo().GetDeclaredField(fieldName); + if (fi != null) + { + return fi; + } + } + return null; + } + + private static PropertyInfo GetProperty(object obj, string propertyName) + { + for (Type t = obj.GetType(); t != null; t = t.GetTypeInfo().BaseType) + { + PropertyInfo pi = t.GetTypeInfo().GetDeclaredProperty(propertyName); + if (pi != null) + { + return pi; + } + } + return null; + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj b/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj new file mode 100644 index 0000000..b34fb28 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj @@ -0,0 +1,21 @@ + + + + + {95DFC527-4DC1-495E-97D7-E94EE1F7140D} + + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Threading.Channels/tests/TestBase.cs b/src/libraries/System.Threading.Channels/tests/TestBase.cs new file mode 100644 index 0000000..d3af1ba --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/TestBase.cs @@ -0,0 +1,48 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; +using Xunit; + +#pragma warning disable 0649 // unused fields there for future testing needs + +namespace System.Threading.Channels.Tests +{ + public abstract class TestBase + { + protected void AssertSynchronouslyCanceled(Task task, CancellationToken token) + { + Assert.Equal(TaskStatus.Canceled, task.Status); + OperationCanceledException oce = Assert.ThrowsAny(() => task.GetAwaiter().GetResult()); + Assert.Equal(token, oce.CancellationToken); + } + + protected async Task AssertCanceled(Task task, CancellationToken token) + { + await Assert.ThrowsAnyAsync(() => task); + AssertSynchronouslyCanceled(task, token); + } + + protected void AssertSynchronousSuccess(Task task) => Assert.Equal(TaskStatus.RanToCompletion, task.Status); + + protected void AssertSynchronousTrue(Task task) + { + AssertSynchronousSuccess(task); + Assert.True(task.Result); + } + + internal sealed class DelegateObserver : IObserver + { + public Action OnNextDelegate = null; + public Action OnErrorDelegate = null; + public Action OnCompletedDelegate = null; + + void IObserver.OnNext(T value) => OnNextDelegate?.Invoke(value); + + void IObserver.OnError(Exception error) => OnErrorDelegate?.Invoke(error); + + void IObserver.OnCompleted() => OnCompletedDelegate?.Invoke(); + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/TestExtensions.cs b/src/libraries/System.Threading.Channels/tests/TestExtensions.cs new file mode 100644 index 0000000..5cf8d40 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/TestExtensions.cs @@ -0,0 +1,36 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading.Channels.Tests +{ + internal static class TestExtensions + { + public static async ValueTask ReadAsync(this ChannelReader reader, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + while (true) + { + if (!await reader.WaitToReadAsync(cancellationToken)) + { + throw new ChannelClosedException(); + } + + if (reader.TryRead(out T item)) + { + return item; + } + } + } + catch (Exception exc) when (!(exc is ChannelClosedException)) + { + throw new ChannelClosedException(exc); + } + } + } +} diff --git a/src/libraries/System.Threading.Channels/tests/UnboundedChannelTests.cs b/src/libraries/System.Threading.Channels/tests/UnboundedChannelTests.cs new file mode 100644 index 0000000..f3d6b67 --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/UnboundedChannelTests.cs @@ -0,0 +1,213 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public abstract class UnboundedChannelTests : ChannelTestBase + { + protected abstract bool AllowSynchronousContinuations { get; } + protected override Channel CreateChannel() => Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = RequiresSingleReader, + AllowSynchronousContinuations = AllowSynchronousContinuations + }); + protected override Channel CreateFullChannel() => null; + + [Fact] + public async Task Complete_BeforeEmpty_NoWaiters_TriggersCompletion() + { + Channel c = CreateChannel(); + Assert.True(c.Writer.TryWrite(42)); + c.Writer.Complete(); + Assert.False(c.Reader.Completion.IsCompleted); + Assert.Equal(42, await c.Reader.ReadAsync()); + await c.Reader.Completion; + } + + [Fact] + public void TryWrite_TryRead_Many() + { + Channel c = CreateChannel(); + + const int NumItems = 100000; + for (int i = 0; i < NumItems; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + for (int i = 0; i < NumItems; i++) + { + Assert.True(c.Reader.TryRead(out int result)); + Assert.Equal(i, result); + } + } + + [Fact] + public void TryWrite_TryRead_OneAtATime() + { + Channel c = CreateChannel(); + + for (int i = 0; i < 10; i++) + { + Assert.True(c.Writer.TryWrite(i)); + Assert.True(c.Reader.TryRead(out int result)); + Assert.Equal(i, result); + } + } + + [Fact] + public void WaitForReadAsync_DataAvailable_CompletesSynchronously() + { + Channel c = CreateChannel(); + Assert.True(c.Writer.TryWrite(42)); + AssertSynchronousTrue(c.Reader.WaitToReadAsync()); + } + + [Theory] + [InlineData(0)] + [InlineData(1)] + public async Task WriteMany_ThenComplete_SuccessfullyReadAll(int readMode) + { + Channel c = CreateChannel(); + for (int i = 0; i < 10; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + + c.Writer.Complete(); + Assert.False(c.Reader.Completion.IsCompleted); + + for (int i = 0; i < 10; i++) + { + Assert.False(c.Reader.Completion.IsCompleted); + switch (readMode) + { + case 0: + int result; + Assert.True(c.Reader.TryRead(out result)); + Assert.Equal(i, result); + break; + case 1: + Assert.Equal(i, await c.Reader.ReadAsync()); + break; + } + } + + await c.Reader.Completion; + } + + [Fact] + public void AllowSynchronousContinuations_WaitToReadAsync_ContinuationsInvokedAccordingToSetting() + { + Channel c = CreateChannel(); + + int expectedId = Environment.CurrentManagedThreadId; + Task r = c.Reader.WaitToReadAsync().ContinueWith(_ => + { + Assert.Equal(AllowSynchronousContinuations, expectedId == Environment.CurrentManagedThreadId); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + + Assert.Equal(TaskStatus.RanToCompletion, c.Writer.WriteAsync(42).Status); + ((IAsyncResult)r).AsyncWaitHandle.WaitOne(); // avoid inlining the continuation + r.GetAwaiter().GetResult(); + } + + [Fact] + public void AllowSynchronousContinuations_CompletionTask_ContinuationsInvokedAccordingToSetting() + { + Channel c = CreateChannel(); + + int expectedId = Environment.CurrentManagedThreadId; + Task r = c.Reader.Completion.ContinueWith(_ => + { + Assert.Equal(AllowSynchronousContinuations, expectedId == Environment.CurrentManagedThreadId); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + + Assert.True(c.Writer.TryComplete()); + ((IAsyncResult)r).AsyncWaitHandle.WaitOne(); // avoid inlining the continuation + r.GetAwaiter().GetResult(); + } + } + + public abstract class SingleReaderUnboundedChannelTests : UnboundedChannelTests + { + protected override bool RequiresSingleReader => true; + + [Fact] + public void ValidateInternalDebuggerAttributes() + { + Channel c = CreateChannel(); + Assert.True(c.Writer.TryWrite(1)); + Assert.True(c.Writer.TryWrite(2)); + + object queue = DebuggerAttributes.GetFieldValue(c, "_items"); + DebuggerAttributes.ValidateDebuggerDisplayReferences(queue); + DebuggerAttributes.ValidateDebuggerTypeProxyProperties(queue); + } + + [Fact] + public async Task MultipleWaiters_CancelsPreviousWaiter() + { + Channel c = CreateChannel(); + Task t1 = c.Reader.WaitToReadAsync(); + Task t2 = c.Reader.WaitToReadAsync(); + await Assert.ThrowsAnyAsync(() => t1); + Assert.True(c.Writer.TryWrite(42)); + Assert.True(await t2); + } + + [Fact] + public void Stress_TryWrite_TryRead() + { + const int NumItems = 3000000; + Channel c = CreateChannel(); + + Task.WaitAll( + Task.Run(async () => + { + int received = 0; + while (await c.Reader.WaitToReadAsync()) + { + while (c.Reader.TryRead(out int i)) + { + Assert.Equal(received, i); + received++; + } + } + }), + Task.Run(() => + { + for (int i = 0; i < NumItems; i++) + { + Assert.True(c.Writer.TryWrite(i)); + } + c.Writer.Complete(); + })); + } + } + + public sealed class SyncMultiReaderUnboundedChannelTests : UnboundedChannelTests + { + protected override bool AllowSynchronousContinuations => true; + } + + public sealed class AsyncMultiReaderUnboundedChannelTests : UnboundedChannelTests + { + protected override bool AllowSynchronousContinuations => false; + } + + public sealed class SyncSingleReaderUnboundedChannelTests : SingleReaderUnboundedChannelTests + { + protected override bool AllowSynchronousContinuations => true; + } + + public sealed class AsyncSingleReaderUnboundedChannelTests : SingleReaderUnboundedChannelTests + { + protected override bool AllowSynchronousContinuations => false; + } +} diff --git a/src/libraries/System.Threading.Channels/tests/UnbufferedChannelTests.cs b/src/libraries/System.Threading.Channels/tests/UnbufferedChannelTests.cs new file mode 100644 index 0000000..e7e681d --- /dev/null +++ b/src/libraries/System.Threading.Channels/tests/UnbufferedChannelTests.cs @@ -0,0 +1,94 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.Channels.Tests +{ + public class UnbufferedChannelTests : ChannelTestBase + { + protected override Channel CreateChannel() => Channel.CreateUnbuffered(); + protected override Channel CreateFullChannel() => CreateChannel(); + + [Fact] + public async Task Complete_BeforeEmpty_WaitingWriters_TriggersCompletion() + { + Channel c = CreateChannel(); + Task write1 = c.Writer.WriteAsync(42); + Task write2 = c.Writer.WriteAsync(43); + c.Writer.Complete(); + await c.Reader.Completion; + await Assert.ThrowsAnyAsync(() => write1); + await Assert.ThrowsAnyAsync(() => write2); + } + + [Fact] + public void TryReadWrite_NoPartner_Fail() + { + Channel c = CreateChannel(); + Assert.False(c.Writer.TryWrite(42)); + Assert.False(c.Reader.TryRead(out int result)); + Assert.Equal(result, 0); + } + + [Fact] + public void TryRead_WriteAsync_Success() + { + Channel c = CreateChannel(); + Task w = c.Writer.WriteAsync(42); + Assert.False(w.IsCompleted); + Assert.True(c.Reader.TryRead(out int result)); + Assert.Equal(42, result); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task Read_MultipleUnpartneredWrites_CancelSome_ReadSucceeds(bool useReadAsync) + { + Channel c = CreateChannel(); + var cts = new CancellationTokenSource(); + + Task[] cancelableWrites = (from i in Enumerable.Range(0, 10) select c.Writer.WriteAsync(42, cts.Token)).ToArray(); + Assert.All(cancelableWrites, cw => Assert.Equal(TaskStatus.WaitingForActivation, cw.Status)); + + Task w = c.Writer.WriteAsync(84); + + cts.Cancel(); + foreach (Task t in cancelableWrites) + { + await AssertCanceled(t, cts.Token); + } + + if (useReadAsync) + { + Assert.True(c.Reader.TryRead(out int result)); + Assert.Equal(84, result); + } + else + { + Assert.Equal(84, await c.Reader.ReadAsync()); + } + } + + [Fact] + public async Task Cancel_PartneredWrite_Success() + { + Channel c = CreateChannel(); + var cts = new CancellationTokenSource(); + + Task w = c.Writer.WriteAsync(42, cts.Token); + Assert.False(w.IsCompleted); + + ValueTask r = c.Reader.ReadAsync(); + Assert.True(r.IsCompletedSuccessfully); + + cts.Cancel(); + await w; // no throw + } + + } +} diff --git a/src/libraries/pkg/Microsoft.Private.PackageBaseline/packageIndex.json b/src/libraries/pkg/Microsoft.Private.PackageBaseline/packageIndex.json index a84192f..6e68cd7 100644 --- a/src/libraries/pkg/Microsoft.Private.PackageBaseline/packageIndex.json +++ b/src/libraries/pkg/Microsoft.Private.PackageBaseline/packageIndex.json @@ -4736,6 +4736,12 @@ "4.0.3.0": "4.5.0" } }, + "System.Threading.Channels": { + "InboxOn": {}, + "AssemblyVersionInPackageVersion": { + "4.0.0.0": "4.5.0" + } + }, "System.Threading.Overlapped": { "StableVersions": [ "4.0.0", diff --git a/src/libraries/pkg/descriptions.json b/src/libraries/pkg/descriptions.json index 6dbf74f..8135589 100644 --- a/src/libraries/pkg/descriptions.json +++ b/src/libraries/pkg/descriptions.json @@ -1889,6 +1889,14 @@ ] }, { + "Name": "System.Threading.Channels", + "Description": "Provides types for passing data between producers and consumers.", + "CommonTypes": [ + "System.Threading.Channel", + "System.Threading.Channel" + ] + }, + { "Name": "System.Threading.ExecutionContext", "Description": "Provides types for managing the information relevant to a logic thread or task of execution.", "CommonTypes": [ -- 2.7.4