- Added UseZeroByteReads to StreamPipeReaderOptions that allows not allocating a buffer by doing a zero byte read on the underlying Stream before the internal buffer is allocated.
}
public partial class StreamPipeReaderOptions
{
- public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
+ public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) { }
+ public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { }
public int BufferSize { get { throw null; } }
public bool LeaveOpen { get { throw null; } }
public int MinimumReadSize { get { throw null; } }
public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
+ public bool UseZeroByteReads { get { throw null; } }
}
public partial class StreamPipeWriterOptions
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
- private readonly int _bufferSize;
- private readonly int _minimumReadThreshold;
- private readonly MemoryPool<byte>? _pool;
-
private CancellationTokenSource? _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;
// Mutable struct! Don't make this readonly
private BufferSegmentStack _bufferSegmentPool;
- private readonly bool _leaveOpen;
+
+ private StreamPipeReaderOptions _options;
/// <summary>
/// Creates a new StreamPipeReader.
throw new ArgumentNullException(nameof(options));
}
+ _options = options;
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
- _minimumReadThreshold = Math.Min(options.MinimumReadSize, options.BufferSize);
- _pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
- _bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
- _leaveOpen = options.LeaveOpen;
}
+ // All derived from the options
+ private bool LeaveOpen => _options.LeaveOpen;
+ private bool UseZeroByteReads => _options.UseZeroByteReads;
+ private int BufferSize => _options.BufferSize;
+ private int MinimumReadThreshold => _options.MinimumReadSize;
+ private MemoryPool<byte> Pool => _options.Pool;
+
/// <summary>
/// Gets the inner stream that is being read from.
/// </summary>
returnSegment.ResetMemory();
}
- if (!_leaveOpen)
+ if (!LeaveOpen)
{
InnerStream.Dispose();
}
var isCanceled = false;
try
{
+ // This optimization only makes sense if we don't have anything buffered
+ if (UseZeroByteReads && _bufferedBytes == 0)
+ {
+ // Wait for data by doing 0 byte read before
+ await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
+ }
+
AllocateReadTail();
Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
{
- Debug.Assert(_readHead != null &&_readTail != null);
+ Debug.Assert(_readHead != null && _readTail != null);
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}
else
{
Debug.Assert(_readTail != null);
- if (_readTail.WritableBytes < _minimumReadThreshold)
+ if (_readTail.WritableBytes < MinimumReadThreshold)
{
BufferSegment nextSegment = AllocateSegment();
_readTail.SetNext(nextSegment);
{
BufferSegment nextSegment = CreateSegmentUnsynchronized();
- if (_pool is null)
+ if (_options.IsDefaultSharedMemoryPool)
{
- nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
+ nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
}
else
{
- nextSegment.SetOwnedMemory(_pool.Rent(_bufferSize));
+ nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
}
return nextSegment;
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
- public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
+ public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) :
+ this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false)
+ {
+
+ }
+
+ /// <summary>Initializes a <see cref="System.IO.Pipelines.StreamPipeReaderOptions" /> instance, optionally specifying a memory pool, a minimum buffer size, a minimum read size, and whether the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
+ /// <param name="pool">The memory pool to use when allocating memory. The default value is <see langword="null" />.</param>
+ /// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
+ /// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
+ /// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
+ /// <param name="useZeroByteReads"><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</param>
+ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false)
{
Pool = pool ?? MemoryPool<byte>.Shared;
+ IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;
+
BufferSize =
bufferSize == -1 ? DefaultBufferSize :
bufferSize <= 0 ? throw new ArgumentOutOfRangeException(nameof(bufferSize)) :
minimumReadSize;
LeaveOpen = leaveOpen;
+
+ UseZeroByteReads = useZeroByteReads;
}
/// <summary>Gets the minimum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
/// <summary>Gets the value that indicates if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
/// <value><see langword="true" /> if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; otherwise, <see langword="false" />.</value>
public bool LeaveOpen { get; }
+
+ /// <summary>Gets the value that indicates if reads with an empty buffer should be issued to the underlying stream, in order to wait for data to arrive before allocating memory.</summary>
+ /// <value><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</value>
+ public bool UseZeroByteReads { get; }
+
+ /// <summary>
+ /// Returns true if Pool is <see cref="MemoryPool{Byte}"/>.Shared
+ /// </summary>
+ internal bool IsDefaultSharedMemoryPool { get; }
}
}
reader.Complete();
}
- [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
- public async Task CanReadMultipleTimes()
+ [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+ [InlineData(false)]
+ [InlineData(true)]
+ public async Task CanReadMultipleTimes(bool useZeroByteReads)
{
// This needs to run inline to synchronize the reader and writer
TaskCompletionSource<object> waitForRead = null;
// We're using the pipe here as a way to pump bytes into the reader asynchronously
var pipe = new Pipe();
- var options = new StreamPipeReaderOptions(bufferSize: 4096);
+ var options = new StreamPipeReaderOptions(bufferSize: 4096, useZeroByteReads: useZeroByteReads);
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);
var writes = new[] { 4096, 1024, 123, 4096, 100 };