Restore Channels ReadAsync implementation (dotnet/corefx#26934)
authorTarek Mahmoud Sayed <tarekms@microsoft.com>
Fri, 9 Feb 2018 01:30:56 +0000 (17:30 -0800)
committerGitHub <noreply@github.com>
Fri, 9 Feb 2018 01:30:56 +0000 (17:30 -0800)
* Restore Channels ReadAsync implementation

* Remove AutoResetAwaiter

* Add Base class  ReadAsync test

* address the feedback

Commit migrated from https://github.com/dotnet/corefx/commit/ce0de73e21635252ceffdb140d3a56c6c19bb565

src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs
src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelUtilities.cs
src/libraries/System.Threading.Channels/src/System/Threading/Channels/SingleConsumerUnboundedChannel.cs
src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
src/libraries/System.Threading.Channels/tests/ChannelTests.cs

index 5067325..c80d92d 100644 (file)
@@ -21,6 +21,8 @@ namespace System.Threading.Channels
         private readonly int _bufferedCapacity;
         /// <summary>Items currently stored in the channel waiting to be read.</summary>
         private readonly Dequeue<T> _items = new Dequeue<T>();
+        /// <summary>Readers waiting to read from the channel.</summary>
+        private readonly Dequeue<ReaderInteractor<T>> _blockedReaders = new Dequeue<ReaderInteractor<T>>();
         /// <summary>Writers waiting to write to the channel.</summary>
         private readonly Dequeue<WriterInteractor<T>> _blockedWriters = new Dequeue<WriterInteractor<T>>();
         /// <summary>Task signaled when any WaitToReadAsync waiters should be woken up.</summary>
@@ -77,6 +79,38 @@ namespace System.Threading.Channels
                 return false;
             }
 
+            public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
+            {
+                if (cancellationToken.IsCancellationRequested)
+                {
+                    return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
+                }
+
+                BoundedChannel<T> parent = _parent;
+                lock (parent.SyncObj)
+                {
+                    parent.AssertInvariants();
+
+                    // If there are any items, hand one back.
+                    if (!parent._items.IsEmpty)
+                    {
+                        return new ValueTask<T>(DequeueItemAndPostProcess());
+                    }
+
+                    // There weren't any items.  If we're done writing so that there
+                    // will never be more items, fail.
+                    if (parent._doneWriting != null)
+                    {
+                        return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
+                    }
+
+                    // Otherwise, queue the reader.
+                    var reader = ReaderInteractor<T>.Create(parent._runContinuationsAsynchronously, cancellationToken);
+                    parent._blockedReaders.EnqueueTail(reader);
+                    return new ValueTask<T>(reader.Task);
+                }
+            }
+
             public override Task<bool> WaitToReadAsync(CancellationToken cancellationToken)
             {
                 if (cancellationToken.IsCancellationRequested)
@@ -203,12 +237,12 @@ namespace System.Threading.Channels
                     ChannelUtilities.Complete(parent._completion, error);
                 }
 
-                // At this point, _blockedWriters and _waitingReaders/Writers will not be mutated:
+                // At this point, _blockedReaders/Writers and _waitingReaders/Writers will not be mutated:
                 // they're only mutated by readers/writers while holding the lock, and only if _doneWriting is null.
                 // We also know that only one thread (this one) will ever get here, as only that thread
                 // will be the one to transition from _doneWriting false to true.  As such, we can
                 // freely manipulate them without any concurrency concerns.
-
+                ChannelUtilities.FailInteractors<ReaderInteractor<T>, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error));
                 ChannelUtilities.FailInteractors<WriterInteractor<T>, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error));
                 ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error);
                 ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: false, error: error);
@@ -219,6 +253,7 @@ namespace System.Threading.Channels
 
             public override bool TryWrite(T item)
             {
+                ReaderInteractor<T> blockedReader = null;
                 ReaderInteractor<bool> waitingReaders = null;
 
                 BoundedChannel<T> parent = _parent;
@@ -237,16 +272,34 @@ namespace System.Threading.Channels
 
                     if (count == 0)
                     {
-                        // There are no items in the channel, which means we may have waiting readers.
-                        // Store the item.
-                        parent._items.EnqueueTail(item);
-                        waitingReaders = parent._waitingReaders;
-                        if (waitingReaders == null)
+                        // There are no items in the channel, which means we may have blocked/waiting readers.
+
+                        // If there are any blocked readers, find one that's not canceled
+                        // and store it to complete outside of the lock, in case it has
+                        // continuations that'll run synchronously
+                        while (!parent._blockedReaders.IsEmpty)
+                        {
+                            ReaderInteractor<T> r = parent._blockedReaders.DequeueHead();
+                            r.UnregisterCancellation(); // ensure that once we grab it, we own its completion
+                            if (!r.Task.IsCompleted)
+                            {
+                                blockedReader = r;
+                                break;
+                            }
+                        }
+
+                        if (blockedReader == null)
                         {
-                            // If no one's waiting to be notified about a 0-to-1 transition, we're done.
-                            return true;
+                            // If there wasn't a blocked reader, then store the item. If no one's waiting
+                            // to be notified about a 0-to-1 transition, we're done.
+                            parent._items.EnqueueTail(item);
+                            waitingReaders = parent._waitingReaders;
+                            if (waitingReaders == null)
+                            {
+                                return true;
+                            }
+                            parent._waitingReaders = null;
                         }
-                        parent._waitingReaders = null;
                     }
                     else if (count < parent._bufferedCapacity)
                     {
@@ -280,11 +333,22 @@ namespace System.Threading.Channels
                     }
                 }
 
-                // We stored an item bringing the count up from 0 to 1.  Alert
-                // any waiting readers that there may be something for them to consume.
-                // Since we're no longer holding the lock, it's possible we'll end up
-                // waking readers that have since come in.
-                waitingReaders.Success(item: true);
+                // We either wrote the item already, or we're transfering it to the blocked reader we grabbed.
+                if (blockedReader != null)
+                {
+                    // Transfer the written item to the blocked reader.
+                    bool success = blockedReader.Success(item);
+                    Debug.Assert(success, "We should always be able to complete the reader.");
+                }
+                else
+                {
+                    // We stored an item bringing the count up from 0 to 1.  Alert
+                    // any waiting readers that there may be something for them to consume.
+                    // Since we're no longer holding the lock, it's possible we'll end up
+                    // waking readers that have since come in.
+                    waitingReaders.Success(item: true);
+                }
+
                 return true;
             }
 
@@ -328,6 +392,7 @@ namespace System.Threading.Channels
                     return Task.FromCanceled(cancellationToken);
                 }
 
+                ReaderInteractor<T> blockedReader = null;
                 ReaderInteractor<bool> waitingReaders = null;
 
                 BoundedChannel<T> parent = _parent;
@@ -346,16 +411,34 @@ namespace System.Threading.Channels
 
                     if (count == 0)
                     {
-                        // There are no items in the channel, which means we may have waiting readers.
-                        // Store the item.
-                        parent._items.EnqueueTail(item);
-                        waitingReaders = parent._waitingReaders;
-                        if (waitingReaders == null)
+                        // There are no items in the channel, which means we may have blocked/waiting readers.
+
+                        // If there are any blocked readers, find one that's not canceled
+                        // and store it to complete outside of the lock, in case it has
+                        // continuations that'll run synchronously
+                        while (!parent._blockedReaders.IsEmpty)
                         {
-                            // If no one's waiting to be notified about a 0-to-1 transition, we're done.
-                            return ChannelUtilities.s_trueTask;
+                            ReaderInteractor<T> r = parent._blockedReaders.DequeueHead();
+                            r.UnregisterCancellation(); // ensure that once we grab it, we own its completion
+                            if (!r.Task.IsCompleted)
+                            {
+                                blockedReader = r;
+                                break;
+                            }
+                        }
+
+                        if (blockedReader == null)
+                        {
+                            // If there wasn't a blocked reader, then store the item. If no one's waiting
+                            // to be notified about a 0-to-1 transition, we're done.
+                            parent._items.EnqueueTail(item);
+                            waitingReaders = parent._waitingReaders;
+                            if (waitingReaders == null)
+                            {
+                                return ChannelUtilities.s_trueTask;
+                            }
+                            parent._waitingReaders = null;
                         }
-                        parent._waitingReaders = null;
                     }
                     else if (count < parent._bufferedCapacity)
                     {
@@ -391,11 +474,22 @@ namespace System.Threading.Channels
                     }
                 }
 
-                // We stored an item bringing the count up from 0 to 1.  Alert
-                // any waiting readers that there may be something for them to consume.
-                // Since we're no longer holding the lock, it's possible we'll end up
-                // waking readers that have since come in.
-                waitingReaders.Success(item: true);
+                // We either wrote the item already, or we're transfering it to the blocked reader we grabbed.
+                if (blockedReader != null)
+                {
+                    // Transfer the written item to the blocked reader.
+                    bool success = blockedReader.Success(item);
+                    Debug.Assert(success, "We should always be able to complete the reader.");
+                }
+                else
+                {
+                    // We stored an item bringing the count up from 0 to 1.  Alert
+                    // any waiting readers that there may be something for them to consume.
+                    // Since we're no longer holding the lock, it's possible we'll end up
+                    // waking readers that have since come in.
+                    waitingReaders.Success(item: true);
+                }
+
                 return ChannelUtilities.s_trueTask;
             }
 
@@ -417,6 +511,7 @@ namespace System.Threading.Channels
 
             if (!_items.IsEmpty)
             {
+                Debug.Assert(_blockedReaders.IsEmpty, "There are items available, so there shouldn't be any blocked readers.");
                 Debug.Assert(_waitingReaders == null, "There are items available, so there shouldn't be any waiting readers.");
             }
             if (_items.Count < _bufferedCapacity)
@@ -424,9 +519,15 @@ namespace System.Threading.Channels
                 Debug.Assert(_blockedWriters.IsEmpty, "There's space available, so there shouldn't be any blocked writers.");
                 Debug.Assert(_waitingWriters == null, "There's space available, so there shouldn't be any waiting writers.");
             }
+            if (!_blockedReaders.IsEmpty)
+            {
+                Debug.Assert(_items.IsEmpty, "There shouldn't be queued items if there's a blocked reader.");
+                Debug.Assert(_blockedWriters.IsEmpty, "There shouldn't be any blocked writer if there's a blocked reader.");
+            }
             if (!_blockedWriters.IsEmpty)
             {
                 Debug.Assert(_items.Count == _bufferedCapacity, "We should have a full buffer if there's a blocked writer.");
+                Debug.Assert(_blockedReaders.IsEmpty, "There shouldn't be any blocked readers if there's a blocked writer.");
             }
             if (_completion.Task.IsCompleted)
             {
index 17d412f..5b4b39b 100644 (file)
@@ -23,7 +23,7 @@ namespace System.Threading.Channels
         /// <summary>Completes the specified TaskCompletionSource.</summary>
         /// <param name="tcs">The source to complete.</param>
         /// <param name="error">
-        /// The optional exception with which to complete.  
+        /// The optional exception with which to complete.
         /// If this is null or the DoneWritingSentinel, the source will be completed successfully.
         /// If this is an OperationCanceledException, it'll be completed with the exception's token.
         /// Otherwise, it'll be completed as faulted with the exception.
@@ -44,6 +44,22 @@ namespace System.Threading.Channels
             }
         }
 
+        /// <summary>Gets a value task representing an error.</summary>
+        /// <typeparam name="T">Specifies the type of the value that would have been returned.</typeparam>
+        /// <param name="error">The error.  This may be <see cref="s_doneWritingSentinel"/>.</param>
+        /// <returns>The failed task.</returns>
+        internal static ValueTask<T> GetInvalidCompletionValueTask<T>(Exception error)
+        {
+            Debug.Assert(error != null);
+
+            Task<T> t =
+                error == s_doneWritingSentinel ? Task.FromException<T>(CreateInvalidCompletionException()) :
+                error is OperationCanceledException oce ? Task.FromCanceled<T>(oce.CancellationToken.IsCancellationRequested ? oce.CancellationToken : new CancellationToken(true)) :
+                Task.FromException<T>(CreateInvalidCompletionException(error));
+
+            return new ValueTask<T>(t);
+        }
+
         /// <summary>Wake up all of the waiters and null out the field.</summary>
         /// <param name="waiters">The waiters.</param>
         /// <param name="result">The value with which to complete each waiter.</param>
index de45e15..aef9904 100644 (file)
@@ -6,6 +6,7 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Threading.Tasks;
+using System.Runtime.CompilerServices;
 
 namespace System.Threading.Channels
 {
@@ -31,6 +32,9 @@ namespace System.Threading.Channels
         /// <summary>non-null if the channel has been marked as complete for writing.</summary>
         private volatile Exception _doneWriting;
 
+        /// <summary>A <see cref="ReaderInteractor{T}"/> if there's a blocked reader.</summary>
+        private ReaderInteractor<T> _blockedReader;
+
         /// <summary>A waiting reader (e.g. WaitForReadAsync) if there is one.</summary>
         private ReaderInteractor<bool> _waitingReader;
 
@@ -54,6 +58,47 @@ namespace System.Threading.Channels
 
             public override Task Completion => _parent._completion.Task;
 
+            public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
+            {
+                {
+                    return TryRead(out T item) ?
+                        new ValueTask<T>(item) :
+                        ReadAsyncCore(cancellationToken);
+                }
+
+                ValueTask<T> ReadAsyncCore(CancellationToken ct)
+                {
+                    SingleConsumerUnboundedChannel<T> parent = _parent;
+                    if (ct.IsCancellationRequested)
+                    {
+                        return new ValueTask<T>(Task.FromCanceled<T>(ct));
+                    }
+
+                    lock (parent.SyncObj)
+                    {
+                        // Now that we hold the lock, try reading again.
+                        if (TryRead(out T item))
+                        {
+                            return new ValueTask<T>(item);
+                        }
+
+                        // If no more items will be written, fail the read.
+                        if (parent._doneWriting != null)
+                        {
+                            return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
+                        }
+
+                        Debug.Assert(parent._blockedReader == null || parent._blockedReader.Task.IsCanceled,
+                            "Incorrect usage; multiple outstanding reads were issued against this single-consumer channel");
+
+                        // Store the reader to be completed by a writer.
+                        var reader = ReaderInteractor<T>.Create(parent._runContinuationsAsynchronously, ct);
+                        parent._blockedReader = reader;
+                        return new ValueTask<T>(reader.Task);
+                    }
+                }
+            }
+
             public override bool TryRead(out T item)
             {
                 SingleConsumerUnboundedChannel<T> parent = _parent;
@@ -124,6 +169,7 @@ namespace System.Threading.Channels
 
             public override bool TryComplete(Exception error)
             {
+                ReaderInteractor<T> blockedReader = null;
                 ReaderInteractor<bool> waitingReader = null;
                 bool completeTask = false;
 
@@ -146,6 +192,12 @@ namespace System.Threading.Channels
                     {
                         completeTask = true;
 
+                        if (parent._blockedReader != null)
+                        {
+                            blockedReader = parent._blockedReader;
+                            parent._blockedReader = null;
+                        }
+
                         if (parent._waitingReader != null)
                         {
                             waitingReader = parent._waitingReader;
@@ -160,7 +212,17 @@ namespace System.Threading.Channels
                     ChannelUtilities.Complete(parent._completion, error);
                 }
 
-                // Complete a waiting reader if necessary.
+                Debug.Assert(blockedReader == null || waitingReader == null, "There should only ever be at most one reader.");
+
+                // Complete a blocked reader if necessary
+                if (blockedReader != null)
+                {
+                    error = ChannelUtilities.CreateInvalidCompletionException(error);
+                    blockedReader.Fail(error);
+                }
+
+                // Complete a waiting reader if necessary.  (We really shouldn't have both a blockedReader
+                // and a waitingReader, but it's more expensive to prevent it than to just tolerate it.)
                 if (waitingReader != null)
                 {
                     if (error != null)
@@ -180,33 +242,59 @@ namespace System.Threading.Channels
             public override bool TryWrite(T item)
             {
                 SingleConsumerUnboundedChannel<T> parent = _parent;
-                ReaderInteractor<bool> waitingReader = null;
-
-                lock (parent.SyncObj)
+                while (true) // in case a reader was canceled and we need to try again
                 {
-                    // If writing is completed, exit out without writing.
-                    if (parent._doneWriting != null)
+                    ReaderInteractor<T> blockedReader = null;
+                    ReaderInteractor<bool> waitingReader = null;
+
+                    lock (parent.SyncObj)
                     {
-                        return false;
+                        // If writing is completed, exit out without writing.
+                        if (parent._doneWriting != null)
+                        {
+                            return false;
+                        }
+
+                        // If there's a blocked reader, store it into a local for completion outside of the lock.
+                        // If there isn't a blocked reader, queue the item being written; then if there's a waiting
+                        blockedReader = parent._blockedReader;
+                        if (blockedReader != null)
+                        {
+                            parent._blockedReader = null;
+                        }
+                        else
+                        {
+                            parent._items.Enqueue(item);
+
+                            waitingReader = parent._waitingReader;
+                            if (waitingReader == null)
+                            {
+                                return true;
+                            }
+                            parent._waitingReader = null;
+                        }
                     }
 
-                    // Queue the item being written; then if there's a waiting
-                    // reader, store it for notification outside of the lock.
-                    parent._items.Enqueue(item);
+                    // If we get here, we grabbed a blocked or a waiting reader.
+                    Debug.Assert((blockedReader != null) ^ (waitingReader != null), "Expected either a blocked or waiting reader, but not both");
+
+                    // If we have a waiting reader, notify it that an item was written and exit.
+                    if (waitingReader != null)
+                    {                // If we get here, we grabbed a waiting reader.
+                        waitingReader.Success(item: true);
+                        return true;
+                    }
 
-                    waitingReader = parent._waitingReader;
-                    if (waitingReader == null)
+                    // Otherwise we have a blocked reader: complete it with the item being written.
+                    // In the case of a ReadAsync(CancellationToken), it's possible the reader could
+                    // have been completed due to cancellation by the time we get here.  In that case,
+                    // we'll loop around to try again so as not to lose the item being written.
+                    Debug.Assert(blockedReader != null);
+                    if (blockedReader.Success(item))
                     {
                         return true;
                     }
-                    parent._waitingReader = null;
                 }
-
-                // If we get here, we grabbed a waiting reader.
-                // Notify it that an item was written and exit.
-                Debug.Assert(waitingReader != null, "Expected a waiting reader");
-                waitingReader.Success(item: true);
-                return true;
             }
 
             public override Task<bool> WaitToWriteAsync(CancellationToken cancellationToken)
index c1d6569..4fe6e92 100644 (file)
@@ -18,6 +18,8 @@ namespace System.Threading.Channels
         private readonly TaskCompletionSource<VoidResult> _completion;
         /// <summary>The items in the channel.</summary>
         private readonly ConcurrentQueue<T> _items = new ConcurrentQueue<T>();
+        /// <summary>Readers blocked reading from the channel.</summary>
+        private readonly Dequeue<ReaderInteractor<T>> _blockedReaders = new Dequeue<ReaderInteractor<T>>();
         /// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary>
         private readonly bool _runContinuationsAsynchronously;
 
@@ -44,6 +46,49 @@ namespace System.Threading.Channels
 
             public override Task Completion => _parent._completion.Task;
 
+            public override ValueTask<T> ReadAsync(CancellationToken cancellationToken) =>
+                TryRead(out T item) ?
+                    new ValueTask<T>(item) :
+                    ReadAsyncCore(cancellationToken);
+
+            private ValueTask<T> ReadAsyncCore(CancellationToken cancellationToken)
+            {
+                if (cancellationToken.IsCancellationRequested)
+                {
+                    return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
+                }
+
+                UnboundedChannel<T> parent = _parent;
+                lock (parent.SyncObj)
+                {
+                    parent.AssertInvariants();
+
+                    // If there are any items, return one.
+                    if (parent._items.TryDequeue(out T item))
+                    {
+                        // Dequeue an item
+                        if (parent._doneWriting != null && parent._items.IsEmpty)
+                        {
+                            // If we've now emptied the items queue and we're not getting any more, complete.
+                            ChannelUtilities.Complete(parent._completion, parent._doneWriting);
+                        }
+
+                        return new ValueTask<T>(item);
+                    }
+
+                    // There are no items, so if we're done writing, fail.
+                    if (parent._doneWriting != null)
+                    {
+                        return ChannelUtilities.GetInvalidCompletionValueTask<T>(parent._doneWriting);
+                    }
+
+                    // Otherwise, queue the reader.
+                    var reader = ReaderInteractor<T>.Create(parent._runContinuationsAsynchronously, cancellationToken);
+                    parent._blockedReaders.EnqueueTail(reader);
+                    return new ValueTask<T>(reader.Task);
+                }
+            }
+
             public override bool TryRead(out T item)
             {
                 UnboundedChannel<T> parent = _parent;
@@ -142,11 +187,10 @@ namespace System.Threading.Channels
                     ChannelUtilities.Complete(parent._completion, error);
                 }
 
-                // At this point, _waitingReaders will not be mutated:
-                // it's only mutated by readers while holding the lock, and only if _doneWriting is null.
-                // We also know that only one thread (this one) will ever get here, as only that thread
-                // will be the one to transition from _doneWriting false to true.  As such, we can
-                // freely manipulate _waitingReaders without any concurrency concerns.
+                // At this point, _blockedReaders and _waitingReaders will not be mutated:
+                // they're only mutated by readers while holding the lock, and only if _doneWriting is null.
+                // freely manipulate _blockedReaders and _waitingReaders without any concurrency concerns.
+                ChannelUtilities.FailInteractors<ReaderInteractor<T>, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error));
                 ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error);
 
                 // Successfully transitioned to completed.
@@ -158,6 +202,7 @@ namespace System.Threading.Channels
                 UnboundedChannel<T> parent = _parent;
                 while (true)
                 {
+                    ReaderInteractor<T> blockedReader = null;
                     ReaderInteractor<bool> waitingReaders = null;
                     lock (parent.SyncObj)
                     {
@@ -168,25 +213,47 @@ namespace System.Threading.Channels
                             return false;
                         }
 
-                        // Add the data to the queue, and let any waiting readers know that they should try to read it.
-                        // We can only complete such waiters here under the lock if they run continuations asynchronously
-                        // (otherwise the synchronous continuations could be invoked under the lock).  If we don't complete
-                        // them here, we need to do so outside of the lock.
-                        parent._items.Enqueue(item);
-                        waitingReaders = parent._waitingReaders;
-                        if (waitingReaders == null)
+                        // If there aren't any blocked readers, just add the data to the queue,
+                        // and let any waiting readers know that they should try to read it.
+                        // We can only complete such waiters here under the lock if they run
+                        // continuations asynchronously (otherwise the synchronous continuations
+                        // could be invoked under the lock).  If we don't complete them here, we
+                        // need to do so outside of the lock.
+                        if (parent._blockedReaders.IsEmpty)
                         {
-                            return true;
+                            parent._items.Enqueue(item);
+                            waitingReaders = parent._waitingReaders;
+                            if (waitingReaders == null)
+                            {
+                                return true;
+                            }
+                            parent._waitingReaders = null;
+                        }
+                        else
+                        {
+                            // There were blocked readers.  Grab one, and then complete it outside of the lock.
+                            blockedReader = parent._blockedReaders.DequeueHead();
                         }
-                        parent._waitingReaders = null;
                     }
 
-                    // Wake up all of the waiters.  Since we've released the lock, it's possible
-                    // we could cause some spurious wake-ups here, if we tell a waiter there's
-                    // something available but all data has already been removed.  It's a benign
-                    // race condition, though, as consumers already need to account for such things.
-                    waitingReaders.Success(item: true);
-                    return true;
+                    if (blockedReader != null)
+                    {
+                        // Complete the reader.  It's possible the reader was canceled, in which
+                        // case we loop around to try everything again.
+                        if (blockedReader.Success(item))
+                        {
+                            return true;
+                        }
+                    }
+                    else
+                    {
+                        // Wake up all of the waiters.  Since we've released the lock, it's possible
+                        // we could cause some spurious wake-ups here, if we tell a waiter there's
+                        // something available but all data has already been removed.  It's a benign
+                        // race condition, though, as consumers already need to account for such things.
+                        waitingReaders.Success(item: true);
+                        return true;
+                    }
                 }
             }
 
@@ -225,11 +292,12 @@ namespace System.Threading.Channels
             {
                 if (_runContinuationsAsynchronously)
                 {
+                    Debug.Assert(_blockedReaders.IsEmpty, "There's data available, so there shouldn't be any blocked readers.");
                     Debug.Assert(_waitingReaders == null, "There's data available, so there shouldn't be any waiting readers.");
                 }
                 Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed.");
             }
-            if (_waitingReaders != null && _runContinuationsAsynchronously)
+            if ((!_blockedReaders.IsEmpty || _waitingReaders != null) && _runContinuationsAsynchronously)
             {
                 Debug.Assert(_items.IsEmpty, "There are blocked/waiting readers, so there shouldn't be any data available.");
             }
index b546f6a..c817225 100644 (file)
@@ -131,6 +131,72 @@ namespace System.Threading.Channels.Tests
             await Assert.ThrowsAsync<FieldAccessException>(() => t);
         }
 
+        [Fact]
+        public async void TestBaseClassReadAsync()
+        {
+            WrapperChannel<int> channel = new WrapperChannel<int>(10);
+            ChannelReader<int> reader = channel.Reader;
+            ChannelWriter<int> writer = channel.Writer;
+
+            // 1- do it through synchronous TryRead()
+            writer.TryWrite(50);
+            Assert.Equal(50, await reader.ReadAsync());
+
+            // 2- do it through async
+            ValueTask<int> readTask = reader.ReadAsync();
+            writer.TryWrite(100);
+            Assert.Equal(100, await readTask);
+
+            // 3- use cancellation token
+            CancellationToken ct = new CancellationToken(true); // cancelled token
+            await Assert.ThrowsAsync<TaskCanceledException>(() => reader.ReadAsync(ct).AsTask());
+
+            // 4- throw during reading
+            readTask = reader.ReadAsync();
+            ((WrapperChannelReader<int>)reader).ForceThrowing = true;
+            writer.TryWrite(200);
+            await Assert.ThrowsAsync<ChannelClosedException>(() => readTask.AsTask());
+
+            // 5- close the channel while waiting reading
+            ((WrapperChannelReader<int>)reader).ForceThrowing = false;
+            Assert.Equal(200, await reader.ReadAsync());
+            readTask = reader.ReadAsync();
+            channel.Writer.TryComplete();
+            await Assert.ThrowsAsync<ChannelClosedException>(() => readTask.AsTask());
+        }
+
+        // This reader doesn't override ReadAsync to force using the base class ReadAsync method
+        private sealed class WrapperChannelReader<T> : ChannelReader<T>
+        {
+            private ChannelReader<T> _reader;
+            internal bool ForceThrowing { get; set; }
+
+            public WrapperChannelReader(Channel<T> channel) {_reader = channel.Reader; }
+
+            public override bool TryRead(out T item)
+            {
+                if (ForceThrowing)
+                    throw new InvalidOperationException();
+
+                return _reader.TryRead(out item);
+            }
+
+            public override Task<bool> WaitToReadAsync(CancellationToken cancellationToken)
+            {
+                return _reader.WaitToReadAsync(cancellationToken);
+            }
+        }
+
+        public class WrapperChannel<T> : Channel<T>
+        {
+            public WrapperChannel(int capacity)
+            {
+                Channel<T> channel = Channel.CreateBounded<T>(capacity);
+                Writer = channel.Writer;
+                Reader = new WrapperChannelReader<T>(channel);
+            }
+        }
+
         private sealed class TestChannelWriter<T> : ChannelWriter<T>
         {
             private readonly Random _rand = new Random(42);