Add ReadAtLeastAsync implementation for StreamPipeReader (#52246)
authorEmmanuel André <2341261+manandre@users.noreply.github.com>
Mon, 10 May 2021 16:09:27 +0000 (18:09 +0200)
committerGitHub <noreply@github.com>
Mon, 10 May 2021 16:09:27 +0000 (09:09 -0700)
-  Increase segment size based on minimumSize
- Use MaxBufferSize from custom memory pool
- Align segment allocation logic on Pipe one
- Group ReadAtLeastAsync tests

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReaderOptions.cs
src/libraries/System.IO.Pipelines/tests/BasePipeReaderReadAtLeastAsyncTests.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/Infrastructure/TestMemoryPool.cs
src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
src/libraries/System.IO.Pipelines/tests/ReadAsyncCancellationTests.cs
src/libraries/System.IO.Pipelines/tests/StreamPipeReaderReadAtLeastAsyncTests.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs
src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj

index e092235..dde8775 100644 (file)
@@ -52,6 +52,7 @@ namespace System.IO.Pipelines
         private bool LeaveOpen => _options.LeaveOpen;
         private bool UseZeroByteReads => _options.UseZeroByteReads;
         private int BufferSize => _options.BufferSize;
+        private int MaxBufferSize => _options.MaxBufferSize;
         private int MinimumReadThreshold => _options.MinimumReadSize;
         private MemoryPool<byte> Pool => _options.Pool;
 
@@ -188,74 +189,168 @@ namespace System.IO.Pipelines
         }
 
         /// <inheritdoc />
-        public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+        public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
         {
             // TODO ReadyAsync needs to throw if there are overlapping reads.
             ThrowIfCompleted();
 
+            cancellationToken.ThrowIfCancellationRequested();
+
             // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
             CancellationTokenSource tokenSource = InternalTokenSource;
             if (TryReadInternal(tokenSource, out ReadResult readResult))
             {
-                return readResult;
+                return new ValueTask<ReadResult>(readResult);
             }
 
             if (_isStreamCompleted)
             {
-                return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
+                ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
+                return new ValueTask<ReadResult>(completedResult);
             }
 
-            CancellationTokenRegistration reg = default;
-            if (cancellationToken.CanBeCanceled)
-            {
-                reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), this);
-            }
+            return Core(this, tokenSource, cancellationToken);
 
-            using (reg)
+            static async ValueTask<ReadResult> Core(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
             {
-                var isCanceled = false;
-                try
+                CancellationTokenRegistration reg = default;
+                if (cancellationToken.CanBeCanceled)
                 {
-                    // This optimization only makes sense if we don't have anything buffered
-                    if (UseZeroByteReads && _bufferedBytes == 0)
+                    reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), reader);
+                }
+
+                using (reg)
+                {
+                    var isCanceled = false;
+                    try
                     {
-                        // Wait for data by doing 0 byte read before
-                        await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
-                    }
+                        // This optimization only makes sense if we don't have anything buffered
+                        if (reader.UseZeroByteReads && reader._bufferedBytes == 0)
+                        {
+                            // Wait for data by doing 0 byte read before
+                            await reader.InnerStream.ReadAsync(Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(false);
+                        }
 
-                    AllocateReadTail();
+                        reader.AllocateReadTail();
 
-                    Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
+                        Memory<byte> buffer = reader._readTail!.AvailableMemory.Slice(reader._readTail.End);
 
-                    int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
+                        int length = await reader.InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
 
-                    Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);
+                        Debug.Assert(length + reader._readTail.End <= reader._readTail.AvailableMemory.Length);
 
-                    _readTail.End += length;
-                    _bufferedBytes += length;
+                        reader._readTail.End += length;
+                        reader._bufferedBytes += length;
 
-                    if (length == 0)
+                        if (length == 0)
+                        {
+                            reader._isStreamCompleted = true;
+                        }
+                    }
+                    catch (OperationCanceledException)
                     {
-                        _isStreamCompleted = true;
+                        reader.ClearCancellationToken();
+
+                        if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+                        {
+                            // Catch cancellation and translate it into setting isCanceled = true
+                            isCanceled = true;
+                        }
+                        else
+                        {
+                            throw;
+                        }
+
                     }
+
+                    return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
                 }
-                catch (OperationCanceledException)
+            }
+        }
+
+        protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
+        {
+            // TODO ReadyAsync needs to throw if there are overlapping reads.
+            ThrowIfCompleted();
+
+            cancellationToken.ThrowIfCancellationRequested();
+
+            // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
+            CancellationTokenSource tokenSource = InternalTokenSource;
+            if (TryReadInternal(tokenSource, out ReadResult readResult))
+            {
+                if (readResult.Buffer.Length >= minimumSize || readResult.IsCompleted || readResult.IsCanceled)
                 {
-                    ClearCancellationToken();
+                    return new ValueTask<ReadResult>(readResult);
+                }
+            }
+
+            if (_isStreamCompleted)
+            {
+                ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
+                return new ValueTask<ReadResult>(completedResult);
+            }
 
-                    if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+            return Core(this, minimumSize, tokenSource, cancellationToken);
+
+            static async ValueTask<ReadResult> Core(StreamPipeReader reader, int minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
+            {
+                CancellationTokenRegistration reg = default;
+                if (cancellationToken.CanBeCanceled)
+                {
+                    reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), reader);
+                }
+
+                using (reg)
+                {
+                    var isCanceled = false;
+                    try
                     {
-                        // Catch cancellation and translate it into setting isCanceled = true
-                        isCanceled = true;
+                        // This optimization only makes sense if we don't have anything buffered
+                        if (reader.UseZeroByteReads && reader._bufferedBytes == 0)
+                        {
+                            // Wait for data by doing 0 byte read before
+                            await reader.InnerStream.ReadAsync(Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(false);
+                        }
+
+                        do
+                        {
+                            reader.AllocateReadTail(minimumSize);
+
+                            Memory<byte> buffer = reader._readTail!.AvailableMemory.Slice(reader._readTail.End);
+
+                            int length = await reader.InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
+
+                            Debug.Assert(length + reader._readTail.End <= reader._readTail.AvailableMemory.Length);
+
+                            reader._readTail.End += length;
+                            reader._bufferedBytes += length;
+
+                            if (length == 0)
+                            {
+                                reader._isStreamCompleted = true;
+                                break;
+                            }
+                        } while (reader._bufferedBytes < minimumSize);
                     }
-                    else
+                    catch (OperationCanceledException)
                     {
-                        throw;
+                        reader.ClearCancellationToken();
+
+                        if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+                        {
+                            // Catch cancellation and translate it into setting isCanceled = true
+                            isCanceled = true;
+                        }
+                        else
+                        {
+                            throw;
+                        }
+
                     }
 
+                    return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
                 }
-
-                return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
             }
         }
 
@@ -434,12 +529,12 @@ namespace System.IO.Pipelines
             return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
         }
 
-        private void AllocateReadTail()
+        private void AllocateReadTail(int? minimumSize = null)
         {
             if (_readHead == null)
             {
                 Debug.Assert(_readTail == null);
-                _readHead = AllocateSegment();
+                _readHead = AllocateSegment(minimumSize);
                 _readTail = _readHead;
             }
             else
@@ -447,29 +542,45 @@ namespace System.IO.Pipelines
                 Debug.Assert(_readTail != null);
                 if (_readTail.WritableBytes < MinimumReadThreshold)
                 {
-                    BufferSegment nextSegment = AllocateSegment();
+                    BufferSegment nextSegment = AllocateSegment(minimumSize);
                     _readTail.SetNext(nextSegment);
                     _readTail = nextSegment;
                 }
             }
         }
 
-        private BufferSegment AllocateSegment()
+        private BufferSegment AllocateSegment(int? minimumSize = null)
         {
             BufferSegment nextSegment = CreateSegmentUnsynchronized();
 
-            if (_options.IsDefaultSharedMemoryPool)
+            var bufferSize = minimumSize ?? BufferSize;
+            int maxSize = !_options.IsDefaultSharedMemoryPool ? _options.Pool.MaxBufferSize : -1;
+
+            if (bufferSize <= maxSize)
             {
-                nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
+                // Use the specified pool as it fits.
+                int sizeToRequest = GetSegmentSize(bufferSize, maxSize);
+                nextSegment.SetOwnedMemory(_options.Pool.Rent(sizeToRequest));
             }
             else
             {
-                nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
+                // Use the array pool
+                int sizeToRequest = GetSegmentSize(bufferSize, MaxBufferSize);
+                nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(sizeToRequest));
             }
 
             return nextSegment;
         }
 
+        private int GetSegmentSize(int sizeHint, int maxBufferSize)
+        {
+            // First we need to handle case where hint is smaller than minimum segment size
+            sizeHint = Math.Max(BufferSize, sizeHint);
+            // After that adjust it to fit into pools max buffer size
+            int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
+            return adjustedToMaximumSize;
+        }
+
         private BufferSegment CreateSegmentUnsynchronized()
         {
             if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
index c170ea5..25050cf 100644 (file)
@@ -9,6 +9,7 @@ namespace System.IO.Pipelines
     public class StreamPipeReaderOptions
     {
         private const int DefaultBufferSize = 4096;
+        internal const int DefaultMaxBufferSize = 2048 * 1024;
         private const int DefaultMinimumReadSize = 1024;
 
         internal static readonly StreamPipeReaderOptions s_default = new StreamPipeReaderOptions();
@@ -55,6 +56,10 @@ namespace System.IO.Pipelines
         /// <value>The buffer size.</value>
         public int BufferSize { get; }
 
+        /// <summary>Gets the maximum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
+        /// <value>The maximum buffer size.</value>
+        internal int MaxBufferSize { get; } = DefaultMaxBufferSize;
+
         /// <summary>Gets the threshold of remaining bytes in the buffer before a new buffer is allocated.</summary>
         /// <value>The minimum read size.</value>
         public int MinimumReadSize { get; }
diff --git a/src/libraries/System.IO.Pipelines/tests/BasePipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/BasePipeReaderReadAtLeastAsyncTests.cs
new file mode 100644 (file)
index 0000000..fa204b7
--- /dev/null
@@ -0,0 +1,11 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.IO.Pipelines.Tests
+{
+    public class BasePipeReaderReadAtLeastAsyncTests : ReadAtLeastAsyncTests
+    {
+        private PipeReader? _pipeReader;
+        protected override PipeReader PipeReader => _pipeReader ?? (_pipeReader = new BasePipeReader(Pipe.Reader));
+    }
+}
index 14248f4..9567b03 100644 (file)
@@ -26,7 +26,8 @@ namespace System.IO.Pipelines
             _disposed = true;
         }
 
-        public override int MaxBufferSize => 4096;
+        internal const int DefaultMaxBufferSize = 4096;
+        public override int MaxBufferSize => DefaultMaxBufferSize;
 
         internal void CheckDisposed()
         {
diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs
new file mode 100644 (file)
index 0000000..c2aaea3
--- /dev/null
@@ -0,0 +1,165 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class ReadAtLeastAsyncTests
+    {
+        private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
+
+        protected Pipe Pipe { get; set; }
+        protected virtual PipeReader PipeReader => Pipe.Reader;
+
+        public ReadAtLeastAsyncTests()
+        {
+            Pipe = new Pipe(s_testOptions);
+        }
+
+        protected virtual void SetPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1)
+        {
+            PipeOptions options = new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, useSynchronizationContext: false , minimumSegmentSize: bufferSize);
+            Pipe = new Pipe(options);
+        }
+
+        [Fact]
+        public async Task CanWriteAndReadAtLeast()
+        {
+            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+
+            await Pipe.Writer.WriteAsync(bytes);
+            ReadResult result = await PipeReader.ReadAtLeastAsync(11);
+            ReadOnlySequence<byte> buffer = result.Buffer;
+
+            Assert.Equal(11, buffer.Length);
+            Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+            PipeReader.AdvanceTo(buffer.End);
+        }
+
+        [Fact]
+        public async Task ReadAtLeastShouldNotCompleteIfWriterWroteLessThanMinimum()
+        {
+            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+
+            await Pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
+            ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11);
+
+            Assert.False(task.IsCompleted);
+
+            await Pipe.Writer.WriteAsync(bytes.AsMemory(5));
+
+            ReadResult result = await task;
+
+            ReadOnlySequence<byte> buffer = result.Buffer;
+
+            Assert.Equal(11, buffer.Length);
+            Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+            PipeReader.AdvanceTo(buffer.End);
+        }
+
+        [Fact]
+        public async Task CanAlternateReadAtLeastAndRead()
+        {
+            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+
+            await Pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
+            ReadResult result = await PipeReader.ReadAtLeastAsync(3);
+            ReadOnlySequence<byte> buffer = result.Buffer;
+
+            Assert.Equal(5, buffer.Length);
+            Assert.Equal("Hello", Encoding.ASCII.GetString(buffer.ToArray()));
+
+            PipeReader.AdvanceTo(buffer.End);
+
+            await Pipe.Writer.WriteAsync(bytes.AsMemory(5));
+            result = await PipeReader.ReadAsync();
+            buffer = result.Buffer;
+
+            Assert.Equal(6, buffer.Length);
+            Assert.Equal(" World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+            PipeReader.AdvanceTo(buffer.End);
+        }
+
+        [Fact]
+        public async Task ReadAtLeastReturnsIfCompleted()
+        {
+            Pipe.Writer.Complete();
+
+            // Make sure we get the same results (state transitions are working)
+            for (int i = 0; i < 3; i++)
+            {
+                ReadResult result = await PipeReader.ReadAtLeastAsync(100);
+
+                Assert.True(result.IsCompleted);
+
+                PipeReader.AdvanceTo(result.Buffer.End);
+            }
+        }
+
+        [Theory]
+        [InlineData(-1, false)]
+        [InlineData(-1, true)]
+        [InlineData(5, false)]
+        [InlineData(5, true)]
+        public async Task CanReadAtLeast(int bufferSize, bool bufferedRead)
+        {
+            SetPipeReaderOptions(bufferSize: bufferSize);
+            await Pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("Hello Pipelines World"));
+
+            if (bufferedRead)
+            {
+                ReadResult bufferedReadResult = await PipeReader.ReadAsync();
+                Assert.NotEqual(0, bufferedReadResult.Buffer.Length);
+                PipeReader.AdvanceTo(bufferedReadResult.Buffer.Start);
+            }
+
+            ReadResult readResult = await PipeReader.ReadAtLeastAsync(20);
+            ReadOnlySequence<byte> buffer = readResult.Buffer;
+
+            Assert.Equal(21, buffer.Length);
+
+            var isSingleSegment = bufferSize == -1;
+            // Optimization in StreamPipeReader.ReadAtLeastAsync()
+            if (PipeReader is StreamPipeReader) isSingleSegment |= !bufferedRead; 
+            Assert.Equal(isSingleSegment, buffer.IsSingleSegment);
+
+            Assert.Equal("Hello Pipelines World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+            PipeReader.AdvanceTo(buffer.End);
+            PipeReader.Complete();
+        }
+
+        [Fact]
+        public Task ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
+        {
+            return Assert.ThrowsAsync<OperationCanceledException>(async () => await PipeReader.ReadAtLeastAsync(0, new CancellationToken(true)));
+        }
+
+        [Fact]
+        public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync()
+        {
+            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+            PipeWriter output = Pipe.Writer;
+            output.Write(bytes);
+            await output.FlushAsync();
+
+            PipeReader.CancelPendingRead();
+
+            ReadResult result = await PipeReader.ReadAtLeastAsync(1000);
+            ReadOnlySequence<byte> buffer = result.Buffer;
+
+            Assert.False(result.IsCompleted);
+            Assert.True(result.IsCanceled);
+            PipeReader.AdvanceTo(buffer.End);
+        }
+    }
+}
index ee6c74e..8ffe91e 100644 (file)
@@ -50,76 +50,6 @@ namespace System.IO.Pipelines.Tests
             _pipe.Reader.AdvanceTo(buffer.End);
         }
 
-        [Theory]
-        [InlineData(false)]
-        [InlineData(true)]
-        public async Task CanWriteAndReadAtLeast(bool baseImplementation)
-        {
-            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
-            var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
-            await _pipe.Writer.WriteAsync(bytes);
-            ReadResult result = await reader.ReadAtLeastAsync(11);
-            ReadOnlySequence<byte> buffer = result.Buffer;
-
-            Assert.Equal(11, buffer.Length);
-            Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-
-            reader.AdvanceTo(buffer.End);
-        }
-
-        [Theory]
-        [InlineData(true)]
-        [InlineData(false)]
-        public async Task ReadAtLeastShouldNotCompleteIfWriterWroteLessThanMinimum(bool baseImplementation)
-        {
-            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
-            var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
-            await _pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
-            ValueTask<ReadResult> task = reader.ReadAtLeastAsync(11);
-
-            Assert.False(task.IsCompleted);
-
-            await _pipe.Writer.WriteAsync(bytes.AsMemory(5));
-
-            ReadResult result = await task;
-
-            ReadOnlySequence<byte> buffer = result.Buffer;
-
-            Assert.Equal(11, buffer.Length);
-            Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-
-            reader.AdvanceTo(buffer.End);
-        }
-
-        [Theory]
-        [InlineData(true)]
-        [InlineData(false)]
-        public async Task CanAlternateReadAtLeastAndRead(bool baseImplementation)
-        {
-            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
-            var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
-            await _pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
-            ReadResult result = await reader.ReadAtLeastAsync(3);
-            ReadOnlySequence<byte> buffer = result.Buffer;
-
-            Assert.Equal(5, buffer.Length);
-            Assert.Equal("Hello", Encoding.ASCII.GetString(buffer.ToArray()));
-
-            reader.AdvanceTo(buffer.End);
-
-            await _pipe.Writer.WriteAsync(bytes.AsMemory(5));
-            result = await reader.ReadAsync();
-            buffer = result.Buffer;
-
-            Assert.Equal(6, buffer.Length);
-            Assert.Equal(" World", Encoding.ASCII.GetString(buffer.ToArray()));
-
-            reader.AdvanceTo(buffer.End);
-        }
-
         [Fact]
         public async Task AdvanceResetsCommitHeadIndex()
         {
@@ -664,26 +594,6 @@ namespace System.IO.Pipelines.Tests
             _pipe.Reader.AdvanceTo(result.Buffer.End);
         }
 
-        [Theory]
-        [InlineData(true)]
-        [InlineData(false)]
-        public async Task ReadAtLeastReturnsIfCompleted(bool baseImplementation)
-        {
-            var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
-            _pipe.Writer.Complete();
-
-            // Make sure we get the same results (state transitions are working)
-            for (int i = 0; i < 3; i++)
-            {
-                ReadResult result = await reader.ReadAtLeastAsync(100);
-
-                Assert.True(result.IsCompleted);
-
-                reader.AdvanceTo(result.Buffer.End);
-            }
-        }
-
         [Fact]
         public void WhenTryReadReturnsFalseDontNeedToCallAdvance()
         {
index 4ed6da1..0e9fe23 100644 (file)
@@ -371,15 +371,6 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public void ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
-        {
-            var cancellationTokenSource = new CancellationTokenSource();
-            cancellationTokenSource.Cancel();
-
-            Assert.Throws<OperationCanceledException>(() => Pipe.Reader.ReadAtLeastAsync(0, cancellationTokenSource.Token));
-        }
-
-        [Fact]
         public async Task ReadAsyncWithNewCancellationTokenNotAffectedByPrevious()
         {
             await Pipe.Writer.WriteAsync(new byte[] { 0 });
@@ -439,29 +430,6 @@ namespace System.IO.Pipelines.Tests
             Pipe.Reader.AdvanceTo(buffer.End, buffer.End);
         }
 
-        [Theory]
-        [InlineData(true)]
-        [InlineData(false)]
-        public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync(bool baseImplementation)
-        {
-            var reader = baseImplementation ? new BasePipeReader(Pipe.Reader) : Pipe.Reader;
-
-            byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
-            PipeWriter output = Pipe.Writer;
-            output.Write(bytes);
-            await output.FlushAsync();
-
-            reader.CancelPendingRead();
-
-            ReadResult result = await reader.ReadAtLeastAsync(1000);
-            ReadOnlySequence<byte> buffer = result.Buffer;
-
-            Assert.False(result.IsCompleted);
-            Assert.True(result.IsCanceled);
-            Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-            reader.AdvanceTo(buffer.End);
-        }
-
         [Fact]
         public async Task ReadAsyncIsNotCancelledWhenCancellationTokenCancelledBetweenReads()
         {
diff --git a/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderReadAtLeastAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderReadAtLeastAsyncTests.cs
new file mode 100644 (file)
index 0000000..9ebc33e
--- /dev/null
@@ -0,0 +1,51 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class StreamPipeReaderReadAtLeastAsyncTests : ReadAtLeastAsyncTests
+    {
+        private PipeReader? _pipeReader;
+        protected override PipeReader PipeReader => _pipeReader ?? (_pipeReader = PipeReader.Create(Pipe.Reader.AsStream()));
+
+        protected override void SetPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1)
+        {
+            _pipeReader = PipeReader.Create(Pipe.Reader.AsStream(), new StreamPipeReaderOptions(pool, bufferSize));
+        }
+
+        private static Func<DisposeTrackingBufferPool> CustomPoolFunc = () => new DisposeTrackingBufferPool();
+        public static TheoryData<MemoryPool<byte>?, int, bool, bool> TestData =>
+            new TheoryData<MemoryPool<byte>?, int, bool, bool>
+            {
+                // pool, bufferSize, isSingleSegment, isFromCustomPool
+                { default, 1, true, false },
+                { default, StreamPipeReaderOptions.DefaultMaxBufferSize, true, false },
+                { default, StreamPipeReaderOptions.DefaultMaxBufferSize + 1, false, false },
+
+                { CustomPoolFunc(), 1, true, true },
+                { CustomPoolFunc(), TestMemoryPool.DefaultMaxBufferSize, true, true },
+                { CustomPoolFunc(), TestMemoryPool.DefaultMaxBufferSize + 1, true, false },
+                { CustomPoolFunc(), StreamPipeReaderOptions.DefaultMaxBufferSize, true, false },
+                { CustomPoolFunc(), StreamPipeReaderOptions.DefaultMaxBufferSize + 1, false, false },
+            };
+
+        [Theory]
+        [MemberData(nameof(TestData))]
+        public async Task ReadAtLeastAsyncSegmentSizeLessThanMaxBufferSize(DisposeTrackingBufferPool? pool, int bufferSize, bool isSingleSegment, bool isFromCustomPool)
+        {
+            SetPipeReaderOptions(pool);
+            Pipe.Writer.WriteEmpty(bufferSize);
+            var task = Pipe.Writer.FlushAsync();
+            ReadResult readResult = await PipeReader.ReadAtLeastAsync(bufferSize);
+            await task;
+
+            Assert.Equal(isSingleSegment, readResult.Buffer.IsSingleSegment);
+            Assert.Equal(isFromCustomPool, (pool?.CurrentlyRentedBlocks ?? 0) != 0);
+            Assert.Equal(bufferSize, readResult.Buffer.Length);
+        }
+    }
+}
index f0944f5..69fb704 100644 (file)
@@ -32,23 +32,6 @@ namespace System.IO.Pipelines.Tests
         }
 
         [Fact]
-        public async Task CanReadAtLeast()
-        {
-            var stream = new MemoryStream(Encoding.ASCII.GetBytes("Hello World"));
-            var reader = PipeReader.Create(stream);
-
-            ReadResult readResult = await reader.ReadAtLeastAsync(10);
-            ReadOnlySequence<byte> buffer = readResult.Buffer;
-
-            Assert.Equal(11, buffer.Length);
-            Assert.True(buffer.IsSingleSegment);
-            Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-
-            reader.AdvanceTo(buffer.End);
-            reader.Complete();
-        }
-
-        [Fact]
         public async Task TryReadReturnsTrueIfBufferedBytesAndNotExaminedEverything()
         {
             var stream = new MemoryStream(Encoding.ASCII.GetBytes("Hello World"));
index 234fa92..d5b3b44 100644 (file)
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
     <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
     <TargetFrameworks>$(NetCoreAppCurrent);net461</TargetFrameworks>
@@ -16,6 +16,7 @@
     <Compile Include="Infrastructure\CancelledReadsStream.cs" />
     <Compile Include="BackpressureTests.cs" />
     <Compile Include="Infrastructure\ObserveDisposeStream.cs" />
+    <Compile Include="PipeReaderReadAtLeastAsyncTests.cs" />
     <Compile Include="PipeReaderCopyToAsyncTests.cs" />
     <Compile Include="FlushAsyncCancellationTests.cs" />
     <Compile Include="FlushAsyncCompletionTests.cs" />
@@ -34,6 +35,8 @@
     <Compile Include="ReadResultTests.cs" />
     <Compile Include="SchedulerFacts.cs" />
     <Compile Include="SequencePipeReaderTests.cs" />
+    <Compile Include="BasePipeReaderReadAtLeastAsyncTests.cs" />
+    <Compile Include="StreamPipeReaderReadAtLeastAsyncTests.cs" />
     <Compile Include="StreamPipeReaderCopyToAsyncTests.cs" />
     <Compile Include="StreamPipeReaderTests.cs" />
     <Compile Include="Infrastructure\TestMemoryPool.cs" />
     <Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
     <Compile Include="PipeReaderWriterStreamTests.nonnetstandard.cs" />
   </ItemGroup>
-  <ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
+  <ItemGroup>
     <!-- Some internal types are needed, so we reference the implementation assembly, rather than the reference assembly. -->
     <ProjectReference Include="..\src\System.IO.Pipelines.csproj" SkipUseReferenceAssembly="true" />
   </ItemGroup>
-  <ItemGroup Condition="'$(TargetFramework)' == 'net461'">
-    <ProjectReference Include="..\src\System.IO.Pipelines.csproj" />
-  </ItemGroup>
   <ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
     <ProjectReference Include="$(CommonTestPath)StreamConformanceTests\StreamConformanceTests.csproj" />
   </ItemGroup>