private bool LeaveOpen => _options.LeaveOpen;
private bool UseZeroByteReads => _options.UseZeroByteReads;
private int BufferSize => _options.BufferSize;
+ private int MaxBufferSize => _options.MaxBufferSize;
private int MinimumReadThreshold => _options.MinimumReadSize;
private MemoryPool<byte> Pool => _options.Pool;
}
/// <inheritdoc />
- public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+ public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();
+ cancellationToken.ThrowIfCancellationRequested();
+
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
{
- return readResult;
+ return new ValueTask<ReadResult>(readResult);
}
if (_isStreamCompleted)
{
- return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
+ ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
+ return new ValueTask<ReadResult>(completedResult);
}
- CancellationTokenRegistration reg = default;
- if (cancellationToken.CanBeCanceled)
- {
- reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), this);
- }
+ return Core(this, tokenSource, cancellationToken);
- using (reg)
+ static async ValueTask<ReadResult> Core(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
{
- var isCanceled = false;
- try
+ CancellationTokenRegistration reg = default;
+ if (cancellationToken.CanBeCanceled)
{
- // This optimization only makes sense if we don't have anything buffered
- if (UseZeroByteReads && _bufferedBytes == 0)
+ reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), reader);
+ }
+
+ using (reg)
+ {
+ var isCanceled = false;
+ try
{
- // Wait for data by doing 0 byte read before
- await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
- }
+ // This optimization only makes sense if we don't have anything buffered
+ if (reader.UseZeroByteReads && reader._bufferedBytes == 0)
+ {
+ // Wait for data by doing 0 byte read before
+ await reader.InnerStream.ReadAsync(Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(false);
+ }
- AllocateReadTail();
+ reader.AllocateReadTail();
- Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
+ Memory<byte> buffer = reader._readTail!.AvailableMemory.Slice(reader._readTail.End);
- int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
+ int length = await reader.InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
- Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);
+ Debug.Assert(length + reader._readTail.End <= reader._readTail.AvailableMemory.Length);
- _readTail.End += length;
- _bufferedBytes += length;
+ reader._readTail.End += length;
+ reader._bufferedBytes += length;
- if (length == 0)
+ if (length == 0)
+ {
+ reader._isStreamCompleted = true;
+ }
+ }
+ catch (OperationCanceledException)
{
- _isStreamCompleted = true;
+ reader.ClearCancellationToken();
+
+ if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+ {
+ // Catch cancellation and translate it into setting isCanceled = true
+ isCanceled = true;
+ }
+ else
+ {
+ throw;
+ }
+
}
+
+ return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
}
- catch (OperationCanceledException)
+ }
+ }
+
+ protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
+ {
+ // TODO ReadyAsync needs to throw if there are overlapping reads.
+ ThrowIfCompleted();
+
+ cancellationToken.ThrowIfCancellationRequested();
+
+ // PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
+ CancellationTokenSource tokenSource = InternalTokenSource;
+ if (TryReadInternal(tokenSource, out ReadResult readResult))
+ {
+ if (readResult.Buffer.Length >= minimumSize || readResult.IsCompleted || readResult.IsCanceled)
{
- ClearCancellationToken();
+ return new ValueTask<ReadResult>(readResult);
+ }
+ }
+
+ if (_isStreamCompleted)
+ {
+ ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
+ return new ValueTask<ReadResult>(completedResult);
+ }
- if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+ return Core(this, minimumSize, tokenSource, cancellationToken);
+
+ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
+ {
+ CancellationTokenRegistration reg = default;
+ if (cancellationToken.CanBeCanceled)
+ {
+ reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), reader);
+ }
+
+ using (reg)
+ {
+ var isCanceled = false;
+ try
{
- // Catch cancellation and translate it into setting isCanceled = true
- isCanceled = true;
+ // This optimization only makes sense if we don't have anything buffered
+ if (reader.UseZeroByteReads && reader._bufferedBytes == 0)
+ {
+ // Wait for data by doing 0 byte read before
+ await reader.InnerStream.ReadAsync(Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(false);
+ }
+
+ do
+ {
+ reader.AllocateReadTail(minimumSize);
+
+ Memory<byte> buffer = reader._readTail!.AvailableMemory.Slice(reader._readTail.End);
+
+ int length = await reader.InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
+
+ Debug.Assert(length + reader._readTail.End <= reader._readTail.AvailableMemory.Length);
+
+ reader._readTail.End += length;
+ reader._bufferedBytes += length;
+
+ if (length == 0)
+ {
+ reader._isStreamCompleted = true;
+ break;
+ }
+ } while (reader._bufferedBytes < minimumSize);
}
- else
+ catch (OperationCanceledException)
{
- throw;
+ reader.ClearCancellationToken();
+
+ if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
+ {
+ // Catch cancellation and translate it into setting isCanceled = true
+ isCanceled = true;
+ }
+ else
+ {
+ throw;
+ }
+
}
+ return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
}
-
- return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
}
}
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}
- private void AllocateReadTail()
+ private void AllocateReadTail(int? minimumSize = null)
{
if (_readHead == null)
{
Debug.Assert(_readTail == null);
- _readHead = AllocateSegment();
+ _readHead = AllocateSegment(minimumSize);
_readTail = _readHead;
}
else
Debug.Assert(_readTail != null);
if (_readTail.WritableBytes < MinimumReadThreshold)
{
- BufferSegment nextSegment = AllocateSegment();
+ BufferSegment nextSegment = AllocateSegment(minimumSize);
_readTail.SetNext(nextSegment);
_readTail = nextSegment;
}
}
}
- private BufferSegment AllocateSegment()
+ private BufferSegment AllocateSegment(int? minimumSize = null)
{
BufferSegment nextSegment = CreateSegmentUnsynchronized();
- if (_options.IsDefaultSharedMemoryPool)
+ var bufferSize = minimumSize ?? BufferSize;
+ int maxSize = !_options.IsDefaultSharedMemoryPool ? _options.Pool.MaxBufferSize : -1;
+
+ if (bufferSize <= maxSize)
{
- nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
+ // Use the specified pool as it fits.
+ int sizeToRequest = GetSegmentSize(bufferSize, maxSize);
+ nextSegment.SetOwnedMemory(_options.Pool.Rent(sizeToRequest));
}
else
{
- nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
+ // Use the array pool
+ int sizeToRequest = GetSegmentSize(bufferSize, MaxBufferSize);
+ nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(sizeToRequest));
}
return nextSegment;
}
+ private int GetSegmentSize(int sizeHint, int maxBufferSize)
+ {
+ // First we need to handle case where hint is smaller than minimum segment size
+ sizeHint = Math.Max(BufferSize, sizeHint);
+ // After that adjust it to fit into pools max buffer size
+ int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
+ return adjustedToMaximumSize;
+ }
+
private BufferSegment CreateSegmentUnsynchronized()
{
if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
public class StreamPipeReaderOptions
{
private const int DefaultBufferSize = 4096;
+ internal const int DefaultMaxBufferSize = 2048 * 1024;
private const int DefaultMinimumReadSize = 1024;
internal static readonly StreamPipeReaderOptions s_default = new StreamPipeReaderOptions();
/// <value>The buffer size.</value>
public int BufferSize { get; }
+ /// <summary>Gets the maximum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
+ /// <value>The maximum buffer size.</value>
+ internal int MaxBufferSize { get; } = DefaultMaxBufferSize;
+
/// <summary>Gets the threshold of remaining bytes in the buffer before a new buffer is allocated.</summary>
/// <value>The minimum read size.</value>
public int MinimumReadSize { get; }
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.IO.Pipelines.Tests
+{
+ public class BasePipeReaderReadAtLeastAsyncTests : ReadAtLeastAsyncTests
+ {
+ private PipeReader? _pipeReader;
+ protected override PipeReader PipeReader => _pipeReader ?? (_pipeReader = new BasePipeReader(Pipe.Reader));
+ }
+}
_disposed = true;
}
- public override int MaxBufferSize => 4096;
+ internal const int DefaultMaxBufferSize = 4096;
+ public override int MaxBufferSize => DefaultMaxBufferSize;
internal void CheckDisposed()
{
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class ReadAtLeastAsyncTests
+ {
+ private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
+
+ protected Pipe Pipe { get; set; }
+ protected virtual PipeReader PipeReader => Pipe.Reader;
+
+ public ReadAtLeastAsyncTests()
+ {
+ Pipe = new Pipe(s_testOptions);
+ }
+
+ protected virtual void SetPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1)
+ {
+ PipeOptions options = new PipeOptions(pool, readerScheduler: PipeScheduler.Inline, useSynchronizationContext: false , minimumSegmentSize: bufferSize);
+ Pipe = new Pipe(options);
+ }
+
+ [Fact]
+ public async Task CanWriteAndReadAtLeast()
+ {
+ byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+
+ await Pipe.Writer.WriteAsync(bytes);
+ ReadResult result = await PipeReader.ReadAtLeastAsync(11);
+ ReadOnlySequence<byte> buffer = result.Buffer;
+
+ Assert.Equal(11, buffer.Length);
+ Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+ PipeReader.AdvanceTo(buffer.End);
+ }
+
+ [Fact]
+ public async Task ReadAtLeastShouldNotCompleteIfWriterWroteLessThanMinimum()
+ {
+ byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+
+ await Pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
+ ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11);
+
+ Assert.False(task.IsCompleted);
+
+ await Pipe.Writer.WriteAsync(bytes.AsMemory(5));
+
+ ReadResult result = await task;
+
+ ReadOnlySequence<byte> buffer = result.Buffer;
+
+ Assert.Equal(11, buffer.Length);
+ Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+ PipeReader.AdvanceTo(buffer.End);
+ }
+
+ [Fact]
+ public async Task CanAlternateReadAtLeastAndRead()
+ {
+ byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+
+ await Pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
+ ReadResult result = await PipeReader.ReadAtLeastAsync(3);
+ ReadOnlySequence<byte> buffer = result.Buffer;
+
+ Assert.Equal(5, buffer.Length);
+ Assert.Equal("Hello", Encoding.ASCII.GetString(buffer.ToArray()));
+
+ PipeReader.AdvanceTo(buffer.End);
+
+ await Pipe.Writer.WriteAsync(bytes.AsMemory(5));
+ result = await PipeReader.ReadAsync();
+ buffer = result.Buffer;
+
+ Assert.Equal(6, buffer.Length);
+ Assert.Equal(" World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+ PipeReader.AdvanceTo(buffer.End);
+ }
+
+ [Fact]
+ public async Task ReadAtLeastReturnsIfCompleted()
+ {
+ Pipe.Writer.Complete();
+
+ // Make sure we get the same results (state transitions are working)
+ for (int i = 0; i < 3; i++)
+ {
+ ReadResult result = await PipeReader.ReadAtLeastAsync(100);
+
+ Assert.True(result.IsCompleted);
+
+ PipeReader.AdvanceTo(result.Buffer.End);
+ }
+ }
+
+ [Theory]
+ [InlineData(-1, false)]
+ [InlineData(-1, true)]
+ [InlineData(5, false)]
+ [InlineData(5, true)]
+ public async Task CanReadAtLeast(int bufferSize, bool bufferedRead)
+ {
+ SetPipeReaderOptions(bufferSize: bufferSize);
+ await Pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("Hello Pipelines World"));
+
+ if (bufferedRead)
+ {
+ ReadResult bufferedReadResult = await PipeReader.ReadAsync();
+ Assert.NotEqual(0, bufferedReadResult.Buffer.Length);
+ PipeReader.AdvanceTo(bufferedReadResult.Buffer.Start);
+ }
+
+ ReadResult readResult = await PipeReader.ReadAtLeastAsync(20);
+ ReadOnlySequence<byte> buffer = readResult.Buffer;
+
+ Assert.Equal(21, buffer.Length);
+
+ var isSingleSegment = bufferSize == -1;
+ // Optimization in StreamPipeReader.ReadAtLeastAsync()
+ if (PipeReader is StreamPipeReader) isSingleSegment |= !bufferedRead;
+ Assert.Equal(isSingleSegment, buffer.IsSingleSegment);
+
+ Assert.Equal("Hello Pipelines World", Encoding.ASCII.GetString(buffer.ToArray()));
+
+ PipeReader.AdvanceTo(buffer.End);
+ PipeReader.Complete();
+ }
+
+ [Fact]
+ public Task ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
+ {
+ return Assert.ThrowsAsync<OperationCanceledException>(async () => await PipeReader.ReadAtLeastAsync(0, new CancellationToken(true)));
+ }
+
+ [Fact]
+ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync()
+ {
+ byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
+ PipeWriter output = Pipe.Writer;
+ output.Write(bytes);
+ await output.FlushAsync();
+
+ PipeReader.CancelPendingRead();
+
+ ReadResult result = await PipeReader.ReadAtLeastAsync(1000);
+ ReadOnlySequence<byte> buffer = result.Buffer;
+
+ Assert.False(result.IsCompleted);
+ Assert.True(result.IsCanceled);
+ PipeReader.AdvanceTo(buffer.End);
+ }
+ }
+}
_pipe.Reader.AdvanceTo(buffer.End);
}
- [Theory]
- [InlineData(false)]
- [InlineData(true)]
- public async Task CanWriteAndReadAtLeast(bool baseImplementation)
- {
- byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
- var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
- await _pipe.Writer.WriteAsync(bytes);
- ReadResult result = await reader.ReadAtLeastAsync(11);
- ReadOnlySequence<byte> buffer = result.Buffer;
-
- Assert.Equal(11, buffer.Length);
- Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-
- reader.AdvanceTo(buffer.End);
- }
-
- [Theory]
- [InlineData(true)]
- [InlineData(false)]
- public async Task ReadAtLeastShouldNotCompleteIfWriterWroteLessThanMinimum(bool baseImplementation)
- {
- byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
- var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
- await _pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
- ValueTask<ReadResult> task = reader.ReadAtLeastAsync(11);
-
- Assert.False(task.IsCompleted);
-
- await _pipe.Writer.WriteAsync(bytes.AsMemory(5));
-
- ReadResult result = await task;
-
- ReadOnlySequence<byte> buffer = result.Buffer;
-
- Assert.Equal(11, buffer.Length);
- Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-
- reader.AdvanceTo(buffer.End);
- }
-
- [Theory]
- [InlineData(true)]
- [InlineData(false)]
- public async Task CanAlternateReadAtLeastAndRead(bool baseImplementation)
- {
- byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
- var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
- await _pipe.Writer.WriteAsync(bytes.AsMemory(0, 5));
- ReadResult result = await reader.ReadAtLeastAsync(3);
- ReadOnlySequence<byte> buffer = result.Buffer;
-
- Assert.Equal(5, buffer.Length);
- Assert.Equal("Hello", Encoding.ASCII.GetString(buffer.ToArray()));
-
- reader.AdvanceTo(buffer.End);
-
- await _pipe.Writer.WriteAsync(bytes.AsMemory(5));
- result = await reader.ReadAsync();
- buffer = result.Buffer;
-
- Assert.Equal(6, buffer.Length);
- Assert.Equal(" World", Encoding.ASCII.GetString(buffer.ToArray()));
-
- reader.AdvanceTo(buffer.End);
- }
-
[Fact]
public async Task AdvanceResetsCommitHeadIndex()
{
_pipe.Reader.AdvanceTo(result.Buffer.End);
}
- [Theory]
- [InlineData(true)]
- [InlineData(false)]
- public async Task ReadAtLeastReturnsIfCompleted(bool baseImplementation)
- {
- var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader;
-
- _pipe.Writer.Complete();
-
- // Make sure we get the same results (state transitions are working)
- for (int i = 0; i < 3; i++)
- {
- ReadResult result = await reader.ReadAtLeastAsync(100);
-
- Assert.True(result.IsCompleted);
-
- reader.AdvanceTo(result.Buffer.End);
- }
- }
-
[Fact]
public void WhenTryReadReturnsFalseDontNeedToCallAdvance()
{
}
[Fact]
- public void ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
- {
- var cancellationTokenSource = new CancellationTokenSource();
- cancellationTokenSource.Cancel();
-
- Assert.Throws<OperationCanceledException>(() => Pipe.Reader.ReadAtLeastAsync(0, cancellationTokenSource.Token));
- }
-
- [Fact]
public async Task ReadAsyncWithNewCancellationTokenNotAffectedByPrevious()
{
await Pipe.Writer.WriteAsync(new byte[] { 0 });
Pipe.Reader.AdvanceTo(buffer.End, buffer.End);
}
- [Theory]
- [InlineData(true)]
- [InlineData(false)]
- public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync(bool baseImplementation)
- {
- var reader = baseImplementation ? new BasePipeReader(Pipe.Reader) : Pipe.Reader;
-
- byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
- PipeWriter output = Pipe.Writer;
- output.Write(bytes);
- await output.FlushAsync();
-
- reader.CancelPendingRead();
-
- ReadResult result = await reader.ReadAtLeastAsync(1000);
- ReadOnlySequence<byte> buffer = result.Buffer;
-
- Assert.False(result.IsCompleted);
- Assert.True(result.IsCanceled);
- Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
- reader.AdvanceTo(buffer.End);
- }
-
[Fact]
public async Task ReadAsyncIsNotCancelledWhenCancellationTokenCancelledBetweenReads()
{
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class StreamPipeReaderReadAtLeastAsyncTests : ReadAtLeastAsyncTests
+ {
+ private PipeReader? _pipeReader;
+ protected override PipeReader PipeReader => _pipeReader ?? (_pipeReader = PipeReader.Create(Pipe.Reader.AsStream()));
+
+ protected override void SetPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1)
+ {
+ _pipeReader = PipeReader.Create(Pipe.Reader.AsStream(), new StreamPipeReaderOptions(pool, bufferSize));
+ }
+
+ private static Func<DisposeTrackingBufferPool> CustomPoolFunc = () => new DisposeTrackingBufferPool();
+ public static TheoryData<MemoryPool<byte>?, int, bool, bool> TestData =>
+ new TheoryData<MemoryPool<byte>?, int, bool, bool>
+ {
+ // pool, bufferSize, isSingleSegment, isFromCustomPool
+ { default, 1, true, false },
+ { default, StreamPipeReaderOptions.DefaultMaxBufferSize, true, false },
+ { default, StreamPipeReaderOptions.DefaultMaxBufferSize + 1, false, false },
+
+ { CustomPoolFunc(), 1, true, true },
+ { CustomPoolFunc(), TestMemoryPool.DefaultMaxBufferSize, true, true },
+ { CustomPoolFunc(), TestMemoryPool.DefaultMaxBufferSize + 1, true, false },
+ { CustomPoolFunc(), StreamPipeReaderOptions.DefaultMaxBufferSize, true, false },
+ { CustomPoolFunc(), StreamPipeReaderOptions.DefaultMaxBufferSize + 1, false, false },
+ };
+
+ [Theory]
+ [MemberData(nameof(TestData))]
+ public async Task ReadAtLeastAsyncSegmentSizeLessThanMaxBufferSize(DisposeTrackingBufferPool? pool, int bufferSize, bool isSingleSegment, bool isFromCustomPool)
+ {
+ SetPipeReaderOptions(pool);
+ Pipe.Writer.WriteEmpty(bufferSize);
+ var task = Pipe.Writer.FlushAsync();
+ ReadResult readResult = await PipeReader.ReadAtLeastAsync(bufferSize);
+ await task;
+
+ Assert.Equal(isSingleSegment, readResult.Buffer.IsSingleSegment);
+ Assert.Equal(isFromCustomPool, (pool?.CurrentlyRentedBlocks ?? 0) != 0);
+ Assert.Equal(bufferSize, readResult.Buffer.Length);
+ }
+ }
+}
}
[Fact]
- public async Task CanReadAtLeast()
- {
- var stream = new MemoryStream(Encoding.ASCII.GetBytes("Hello World"));
- var reader = PipeReader.Create(stream);
-
- ReadResult readResult = await reader.ReadAtLeastAsync(10);
- ReadOnlySequence<byte> buffer = readResult.Buffer;
-
- Assert.Equal(11, buffer.Length);
- Assert.True(buffer.IsSingleSegment);
- Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
-
- reader.AdvanceTo(buffer.End);
- reader.Complete();
- }
-
- [Fact]
public async Task TryReadReturnsTrueIfBufferedBytesAndNotExaminedEverything()
{
var stream = new MemoryStream(Encoding.ASCII.GetBytes("Hello World"));
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>$(NetCoreAppCurrent);net461</TargetFrameworks>
<Compile Include="Infrastructure\CancelledReadsStream.cs" />
<Compile Include="BackpressureTests.cs" />
<Compile Include="Infrastructure\ObserveDisposeStream.cs" />
+ <Compile Include="PipeReaderReadAtLeastAsyncTests.cs" />
<Compile Include="PipeReaderCopyToAsyncTests.cs" />
<Compile Include="FlushAsyncCancellationTests.cs" />
<Compile Include="FlushAsyncCompletionTests.cs" />
<Compile Include="ReadResultTests.cs" />
<Compile Include="SchedulerFacts.cs" />
<Compile Include="SequencePipeReaderTests.cs" />
+ <Compile Include="BasePipeReaderReadAtLeastAsyncTests.cs" />
+ <Compile Include="StreamPipeReaderReadAtLeastAsyncTests.cs" />
<Compile Include="StreamPipeReaderCopyToAsyncTests.cs" />
<Compile Include="StreamPipeReaderTests.cs" />
<Compile Include="Infrastructure\TestMemoryPool.cs" />
<Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderWriterStreamTests.nonnetstandard.cs" />
</ItemGroup>
- <ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
+ <ItemGroup>
<!-- Some internal types are needed, so we reference the implementation assembly, rather than the reference assembly. -->
<ProjectReference Include="..\src\System.IO.Pipelines.csproj" SkipUseReferenceAssembly="true" />
</ItemGroup>
- <ItemGroup Condition="'$(TargetFramework)' == 'net461'">
- <ProjectReference Include="..\src\System.IO.Pipelines.csproj" />
- </ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
<ProjectReference Include="$(CommonTestPath)StreamConformanceTests\StreamConformanceTests.csproj" />
</ItemGroup>