Add an option to do zero byte reads on StreamPipeReader (#49117)
authorDavid Fowler <davidfowl@gmail.com>
Thu, 11 Mar 2021 07:49:43 +0000 (23:49 -0800)
committerGitHub <noreply@github.com>
Thu, 11 Mar 2021 07:49:43 +0000 (23:49 -0800)
- Added UseZeroByteReads to StreamPipeReaderOptions that allows not allocating a buffer by doing a zero byte read on the underlying Stream before the internal buffer is allocated.

src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReaderOptions.cs
src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs

index c3536cd..b038877 100644 (file)
@@ -94,11 +94,13 @@ namespace System.IO.Pipelines
     }
     public partial class StreamPipeReaderOptions
     {
-        public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
+        public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) { }
+        public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { }
         public int BufferSize { get { throw null; } }
         public bool LeaveOpen { get { throw null; } }
         public int MinimumReadSize { get { throw null; } }
         public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
+        public bool UseZeroByteReads { get { throw null; } }
     }
     public partial class StreamPipeWriterOptions
     {
index b283ad4..0297ca8 100644 (file)
@@ -13,10 +13,6 @@ namespace System.IO.Pipelines
         internal const int InitialSegmentPoolSize = 4; // 16K
         internal const int MaxSegmentPoolSize = 256; // 1MB
 
-        private readonly int _bufferSize;
-        private readonly int _minimumReadThreshold;
-        private readonly MemoryPool<byte>? _pool;
-
         private CancellationTokenSource? _internalTokenSource;
         private bool _isReaderCompleted;
         private bool _isStreamCompleted;
@@ -31,7 +27,8 @@ namespace System.IO.Pipelines
 
         // Mutable struct! Don't make this readonly
         private BufferSegmentStack _bufferSegmentPool;
-        private readonly bool _leaveOpen;
+
+        private StreamPipeReaderOptions _options;
 
         /// <summary>
         /// Creates a new StreamPipeReader.
@@ -47,13 +44,17 @@ namespace System.IO.Pipelines
                 throw new ArgumentNullException(nameof(options));
             }
 
+            _options = options;
             _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
-            _minimumReadThreshold = Math.Min(options.MinimumReadSize, options.BufferSize);
-            _pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
-            _bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
-            _leaveOpen = options.LeaveOpen;
         }
 
+        // All derived from the options
+        private bool LeaveOpen => _options.LeaveOpen;
+        private bool UseZeroByteReads => _options.UseZeroByteReads;
+        private int BufferSize => _options.BufferSize;
+        private int MinimumReadThreshold => _options.MinimumReadSize;
+        private MemoryPool<byte> Pool => _options.Pool;
+
         /// <summary>
         /// Gets the inner stream that is being read from.
         /// </summary>
@@ -180,7 +181,7 @@ namespace System.IO.Pipelines
                 returnSegment.ResetMemory();
             }
 
-            if (!_leaveOpen)
+            if (!LeaveOpen)
             {
                 InnerStream.Dispose();
             }
@@ -215,6 +216,13 @@ namespace System.IO.Pipelines
                 var isCanceled = false;
                 try
                 {
+                    // This optimization only makes sense if we don't have anything buffered
+                    if (UseZeroByteReads && _bufferedBytes == 0)
+                    {
+                        // Wait for data by doing 0 byte read before
+                        await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
+                    }
+
                     AllocateReadTail();
 
                     Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
@@ -296,7 +304,7 @@ namespace System.IO.Pipelines
 
         private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
         {
-            Debug.Assert(_readHead != null &&_readTail != null);
+            Debug.Assert(_readHead != null && _readTail != null);
             return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
         }
 
@@ -311,7 +319,7 @@ namespace System.IO.Pipelines
             else
             {
                 Debug.Assert(_readTail != null);
-                if (_readTail.WritableBytes < _minimumReadThreshold)
+                if (_readTail.WritableBytes < MinimumReadThreshold)
                 {
                     BufferSegment nextSegment = AllocateSegment();
                     _readTail.SetNext(nextSegment);
@@ -324,13 +332,13 @@ namespace System.IO.Pipelines
         {
             BufferSegment nextSegment = CreateSegmentUnsynchronized();
 
-            if (_pool is null)
+            if (_options.IsDefaultSharedMemoryPool)
             {
-                nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
+                nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
             }
             else
             {
-                nextSegment.SetOwnedMemory(_pool.Rent(_bufferSize));
+                nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
             }
 
             return nextSegment;
index 6ac952f..c170ea5 100644 (file)
@@ -18,10 +18,24 @@ namespace System.IO.Pipelines
         /// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
         /// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
         /// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
-        public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
+        public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) :
+            this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false)
+        {
+
+        }
+
+        /// <summary>Initializes a <see cref="System.IO.Pipelines.StreamPipeReaderOptions" /> instance, optionally specifying a memory pool, a minimum buffer size, a minimum read size, and whether the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
+        /// <param name="pool">The memory pool to use when allocating memory. The default value is <see langword="null" />.</param>
+        /// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
+        /// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
+        /// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
+        /// <param name="useZeroByteReads"><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</param>
+        public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false)
         {
             Pool = pool ?? MemoryPool<byte>.Shared;
 
+            IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;
+
             BufferSize =
                 bufferSize == -1 ? DefaultBufferSize :
                 bufferSize <= 0 ? throw new ArgumentOutOfRangeException(nameof(bufferSize)) :
@@ -33,6 +47,8 @@ namespace System.IO.Pipelines
                 minimumReadSize;
 
             LeaveOpen = leaveOpen;
+
+            UseZeroByteReads = useZeroByteReads;
         }
 
         /// <summary>Gets the minimum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
@@ -50,5 +66,14 @@ namespace System.IO.Pipelines
         /// <summary>Gets the value that indicates if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
         /// <value><see langword="true" /> if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; otherwise, <see langword="false" />.</value>
         public bool LeaveOpen { get; }
+
+        /// <summary>Gets the value that indicates if reads with an empty buffer should be issued to the underlying stream, in order to wait for data to arrive before allocating memory.</summary>
+        /// <value><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</value>
+        public bool UseZeroByteReads { get; }
+
+        /// <summary>
+        /// Returns true if Pool is <see cref="MemoryPool{Byte}"/>.Shared
+        /// </summary>
+        internal bool IsDefaultSharedMemoryPool { get; }
     }
 }
index 74db88f..69fb704 100644 (file)
@@ -67,8 +67,10 @@ namespace System.IO.Pipelines.Tests
             reader.Complete();
         }
 
-        [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
-        public async Task CanReadMultipleTimes()
+        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [InlineData(false)]
+        [InlineData(true)]
+        public async Task CanReadMultipleTimes(bool useZeroByteReads)
         {
             // This needs to run inline to synchronize the reader and writer
             TaskCompletionSource<object> waitForRead = null;
@@ -109,7 +111,7 @@ namespace System.IO.Pipelines.Tests
 
             // We're using the pipe here as a way to pump bytes into the reader asynchronously
             var pipe = new Pipe();
-            var options = new StreamPipeReaderOptions(bufferSize: 4096);
+            var options = new StreamPipeReaderOptions(bufferSize: 4096, useZeroByteReads: useZeroByteReads);
             PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);
 
             var writes = new[] { 4096, 1024, 123, 4096, 100 };