protected PipeReader() { }
public abstract void AdvanceTo(System.SequencePosition consumed);
public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
+ public virtual System.IO.Stream AsStream() { throw null; }
public abstract void CancelPendingRead();
public abstract void Complete(System.Exception exception = null);
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
{
protected PipeWriter() { }
public abstract void Advance(int bytes);
+ public virtual System.IO.Stream AsStream() { throw null; }
public abstract void CancelPendingFlush();
public abstract void Complete(System.Exception exception = null);
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
<?xml version="1.0" encoding="utf-8"?>
<root>
- <!--
- Microsoft ResX Schema
-
+ <!--
+ Microsoft ResX Schema
+
Version 2.0
-
- The primary goals of this format is to allow a simple XML format
- that is mostly human readable. The generation and parsing of the
- various data types are done through the TypeConverter classes
+
+ The primary goals of this format is to allow a simple XML format
+ that is mostly human readable. The generation and parsing of the
+ various data types are done through the TypeConverter classes
associated with the data types.
-
+
Example:
-
+
... ado.net/XML headers & schema ...
<resheader name="resmimetype">text/microsoft-resx</resheader>
<resheader name="version">2.0</resheader>
<value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
<comment>This is a comment</comment>
</data>
-
- There are any number of "resheader" rows that contain simple
+
+ There are any number of "resheader" rows that contain simple
name/value pairs.
-
- Each data row contains a name, and value. The row also contains a
- type or mimetype. Type corresponds to a .NET class that support
- text/value conversion through the TypeConverter architecture.
- Classes that don't support this are serialized and stored with the
+
+ Each data row contains a name, and value. The row also contains a
+ type or mimetype. Type corresponds to a .NET class that support
+ text/value conversion through the TypeConverter architecture.
+ Classes that don't support this are serialized and stored with the
mimetype set.
-
- The mimetype is used for serialized objects, and tells the
- ResXResourceReader how to depersist the object. This is currently not
+
+ The mimetype is used for serialized objects, and tells the
+ ResXResourceReader how to depersist the object. This is currently not
extensible. For a given mimetype the value must be set accordingly:
-
- Note - application/x-microsoft.net.object.binary.base64 is the format
- that the ResXResourceWriter will generate, however the reader can
+
+ Note - application/x-microsoft.net.object.binary.base64 is the format
+ that the ResXResourceWriter will generate, however the reader can
read any of the formats listed below.
-
+
mimetype: application/x-microsoft.net.object.binary.base64
- value : The object must be serialized with
+ value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
: and then encoded with base64 encoding.
-
+
mimetype: application/x-microsoft.net.object.soap.base64
- value : The object must be serialized with
+ value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Soap.SoapFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.bytearray.base64
- value : The object must be serialized into a byte array
+ value : The object must be serialized into a byte array
: using a System.ComponentModel.TypeConverter
: and then encoded with base64 encoding.
-->
<data name="ConcurrentOperationsNotSupported" xml:space="preserve">
<value>Concurrent reads or writes are not supported.</value>
</data>
+ <data name="FlushCanceledOnPipeWriter" xml:space="preserve">
+ <value>Flush was canceled on underlying PipeWriter.</value>
+ </data>
<data name="GetResultBeforeCompleted" xml:space="preserve">
<value>Can't GetResult unless awaiter is completed.</value>
</data>
+ <data name="InvalidZeroByteRead" xml:space="preserve">
+ <value>The PipeReader returned 0 bytes when the ReadResult was not completed or canceled.</value>
+ </data>
<data name="NoReadingOperationToComplete" xml:space="preserve">
<value>No reading operation to complete.</value>
</data>
<data name="NoWritingOperation" xml:space="preserve">
<value>No writing operation. Make sure GetMemory() was called.</value>
</data>
+ <data name="ReadCanceledOnPipeReader" xml:space="preserve">
+ <value>Read was canceled on underlying PipeReader.</value>
+ </data>
<data name="ReaderAndWriterHasToBeCompleted" xml:space="preserve">
<value>Both reader and writer has to be completed to be able to reset the pipe.</value>
</data>
<data name="WritingAfterCompleted" xml:space="preserve">
<value>Writing is not allowed after writer was completed.</value>
</data>
-</root>
+</root>
\ No newline at end of file
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
</PropertyGroup>
<ItemGroup>
+ <Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
+ <Link>Common\CoreLib\System\Threading\Tasks\TaskToApm.cs</Link>
+ </Compile>
<Compile Include="Properties\InternalsVisibleTo.cs" />
<Compile Include="System\IO\Pipelines\BufferSegment.cs" />
<Compile Include="System\IO\Pipelines\CompletionData.cs" />
<Compile Include="System\IO\Pipelines\PipeOptions.cs" />
<Compile Include="System\IO\Pipelines\PipeReader.cs" />
<Compile Include="System\IO\Pipelines\PipeOperationState.cs" />
+ <Compile Include="System\IO\Pipelines\PipeReaderStream.cs" />
<Compile Include="System\IO\Pipelines\PipeScheduler.cs" />
<Compile Include="System\IO\Pipelines\PipeWriter.cs" />
+ <Compile Include="System\IO\Pipelines\PipeWriterStream.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
<Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
/// </summary>
public abstract partial class PipeReader
{
+ private PipeReaderStream _stream;
+
/// <summary>
/// Attempt to synchronously read data the <see cref="PipeReader"/>.
/// </summary>
/// </remarks>
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);
+ /// <summary>
+ ///
+ /// </summary>
+ /// <returns></returns>
+ public virtual Stream AsStream()
+ {
+ return _stream ?? (_stream = new PipeReaderStream(this));
+ }
+
/// <summary>
/// Cancel to currently pending or if none is pending next call to <see cref="ReadAsync"/>, without completing the <see cref="PipeReader"/>.
/// </summary>
if (result.IsCanceled)
{
- throw new OperationCanceledException();
+ ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}
while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+ internal sealed class PipeReaderStream : Stream
+ {
+ private readonly PipeReader _pipeReader;
+
+ public PipeReaderStream(PipeReader pipeReader)
+ {
+ Debug.Assert(pipeReader != null);
+ _pipeReader = pipeReader;
+ }
+
+ public override bool CanRead => true;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => false;
+
+ public override long Length => throw new NotSupportedException();
+
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+ public override void Flush()
+ {
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+
+ public override void SetLength(long value) => throw new NotSupportedException();
+
+ public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+ public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+ TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state);
+
+ public sealed override int EndRead(IAsyncResult asyncResult) =>
+ TaskToApm.End<int>(asyncResult);
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
+ }
+
+#if !netstandard
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ return ReadAsyncInternal(buffer, cancellationToken);
+ }
+#endif
+
+ private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
+ {
+ ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
+ }
+
+ ReadOnlySequence<byte> sequence = result.Buffer;
+ long bufferLength = sequence.Length;
+ SequencePosition consumed = sequence.Start;
+
+ try
+ {
+ if (bufferLength != 0)
+ {
+ int actual = (int)Math.Min(bufferLength, buffer.Length);
+
+ ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
+ consumed = slice.End;
+ slice.CopyTo(buffer.Span);
+
+ return actual;
+ }
+
+ if (result.IsCompleted)
+ {
+ return 0;
+ }
+ }
+ finally
+ {
+ _pipeReader.AdvanceTo(consumed);
+ }
+
+ // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
+ // isn't completed or canceled
+ ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
+ return 0;
+ }
+
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+ {
+ // Delegate to CopyToAsync on the PipeReader
+ return _pipeReader.CopyToAsync(destination, cancellationToken);
+ }
+ }
+}
/// </summary>
public abstract partial class PipeWriter : IBufferWriter<byte>
{
+ private PipeWriterStream _stream;
+
/// <summary>
/// Marks the <see cref="PipeWriter"/> as being complete, meaning no more items will be written to it.
/// </summary>
/// <inheritdoc />
public abstract Span<byte> GetSpan(int sizeHint = 0);
+ /// <summary>
+ ///
+ /// </summary>
+ /// <returns></returns>
+ public virtual Stream AsStream()
+ {
+ return _stream ?? (_stream = new PipeWriterStream(this));
+ }
+
/// <summary>
/// Writes <paramref name="source"/> to the pipe and makes data accessible to <see cref="PipeReader"/>
/// </summary>
if (result.IsCanceled)
{
- throw new OperationCanceledException();
+ ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}
if (result.IsCompleted)
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+ internal sealed class PipeWriterStream : Stream
+ {
+ private readonly PipeWriter _pipeWriter;
+
+ public PipeWriterStream(PipeWriter pipeWriter)
+ {
+ Debug.Assert(pipeWriter != null);
+ _pipeWriter = pipeWriter;
+ }
+
+ public override bool CanRead => false;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => true;
+
+ public override long Length => throw new NotSupportedException();
+
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+ public override void Flush()
+ {
+ FlushAsync().GetAwaiter().GetResult();
+ }
+
+ public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+
+ public override void SetLength(long value) => throw new NotSupportedException();
+
+ public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+ TaskToApm.Begin(WriteAsync(buffer, offset, count, default), callback, state);
+
+ public sealed override void EndWrite(IAsyncResult asyncResult) =>
+ TaskToApm.End(asyncResult);
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ ValueTask<FlushResult> valueTask = _pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+
+ return GetFlushResultAsTask(valueTask);
+ }
+
+#if !netstandard
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ ValueTask<FlushResult> valueTask = _pipeWriter.WriteAsync(buffer, cancellationToken);
+
+ return new ValueTask(GetFlushResultAsTask(valueTask));
+ }
+#endif
+
+ public override Task FlushAsync(CancellationToken cancellationToken)
+ {
+ ValueTask<FlushResult> valueTask = _pipeWriter.FlushAsync(cancellationToken);
+
+ return GetFlushResultAsTask(valueTask);
+ }
+
+ private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask)
+ {
+ if (valueTask.IsCompletedSuccessfully)
+ {
+ FlushResult result = valueTask.Result;
+ if (result.IsCanceled)
+ {
+ ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
+ }
+
+ return Task.CompletedTask;
+ }
+
+ static async Task AwaitTask(ValueTask<FlushResult> valueTask)
+ {
+ FlushResult result = await valueTask.ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
+ }
+ }
+
+ return AwaitTask(valueTask);
+ }
+ }
+}
+
public static void ThrowInvalidOperationException_ResetIncompleteReaderWriter() => throw CreateInvalidOperationException_ResetIncompleteReaderWriter();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_ResetIncompleteReaderWriter() => new InvalidOperationException(SR.ReaderAndWriterHasToBeCompleted);
+
+ public static void ThrowOperationCanceledException_ReadCanceled() => throw CreateOperationCanceledException_ReadCanceled();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateOperationCanceledException_ReadCanceled() => new OperationCanceledException(SR.ReadCanceledOnPipeReader);
+
+ public static void ThrowOperationCanceledException_FlushCanceled() => throw CreateOperationCanceledException_FlushCanceled();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateOperationCanceledException_FlushCanceled() => new OperationCanceledException(SR.FlushCanceledOnPipeWriter);
+
+ public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);
}
internal enum ExceptionArgument
--- /dev/null
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class PipeReaderStreamTests
+ {
+ public delegate Task<int> ReadAsyncDelegate(Stream stream, byte[] data);
+
+ [Theory]
+ [MemberData(nameof(ReadCalls))]
+ public async Task ReadingFromPipeReaderStreamReadsFromUnderlyingPipeReader(ReadAsyncDelegate readAsync)
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+ await pipe.Writer.WriteAsync(helloBytes);
+ pipe.Writer.Complete();
+
+ var stream = new PipeReaderStream(pipe.Reader);
+
+ var buffer = new byte[1024];
+ int read = await readAsync(stream, buffer);
+
+ Assert.Equal(helloBytes, buffer.AsSpan(0, read).ToArray());
+ pipe.Reader.Complete();
+ }
+
+ [Theory]
+ [MemberData(nameof(ReadCalls))]
+ public async Task AsStreamReturnsPipeReaderStream(ReadAsyncDelegate readAsync)
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+ await pipe.Writer.WriteAsync(helloBytes);
+ pipe.Writer.Complete();
+
+ Stream stream = pipe.Reader.AsStream();
+
+ var buffer = new byte[1024];
+ int read = await readAsync(stream, buffer);
+
+ Assert.Equal(helloBytes, buffer.AsSpan(0, read).ToArray());
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task ReadingWithSmallerBufferWorks()
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+ await pipe.Writer.WriteAsync(helloBytes);
+ pipe.Writer.Complete();
+
+ Stream stream = pipe.Reader.AsStream();
+
+ var buffer = new byte[5];
+ int read = await stream.ReadAsync(buffer);
+
+ Assert.Equal(5, read);
+ Assert.Equal(helloBytes.AsSpan(0, 5).ToArray(), buffer);
+
+ buffer = new byte[3];
+ read = await stream.ReadAsync(buffer);
+
+ Assert.Equal(3, read);
+ Assert.Equal(helloBytes.AsSpan(5, 3).ToArray(), buffer);
+
+ // Verify that the buffer is partially consumed and we can read the rest from the PipeReader directly
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.Equal(helloBytes.AsSpan(8).ToArray(), result.Buffer.ToArray());
+ pipe.Reader.AdvanceTo(result.Buffer.End);
+
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task EndOfPipeReaderReturnsZeroBytesFromReadAsync()
+ {
+ var pipe = new Pipe();
+ Memory<byte> memory = pipe.Writer.GetMemory();
+ pipe.Writer.Advance(5);
+ pipe.Writer.Complete();
+
+ Stream stream = pipe.Reader.AsStream();
+
+ var buffer = new byte[5];
+ var read = await stream.ReadAsync(buffer);
+
+ Assert.Equal(5, read);
+
+ read = await stream.ReadAsync(buffer);
+
+ // Read again to make sure it always returns 0
+ Assert.Equal(0, read);
+
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task BuggyPipeReaderImplementationThrows()
+ {
+ var pipeReader = new BuggyPipeReader();
+
+ Stream stream = pipeReader.AsStream();
+
+ await Assert.ThrowsAsync<InvalidOperationException>(async () => await stream.ReadAsync(new byte[5]));
+ }
+
+ [Fact]
+ public async Task WritingToPipeReaderStreamThrowsNotSupported()
+ {
+ var pipe = new Pipe();
+
+ Stream stream = pipe.Reader.AsStream();
+ Assert.False(stream.CanWrite);
+ Assert.False(stream.CanSeek);
+ Assert.True(stream.CanRead);
+ Assert.Throws<NotSupportedException>(() => { long length = stream.Length; });
+ Assert.Throws<NotSupportedException>(() => { long position = stream.Position; });
+ Assert.Throws<NotSupportedException>(() => stream.Seek(0, SeekOrigin.Begin));
+ Assert.Throws<NotSupportedException>(() => stream.Write(new byte[10], 0, 10));
+ await Assert.ThrowsAsync<NotSupportedException>(() => stream.WriteAsync(new byte[10], 0, 10));
+ await Assert.ThrowsAsync<NotSupportedException>(() => stream.WriteAsync(new byte[10]).AsTask());
+
+ pipe.Reader.Complete();
+ pipe.Writer.Complete();
+ }
+
+ [Fact]
+ public async Task CancellingPendingReadThrowsOperationCancelledException()
+ {
+ var pipe = new Pipe();
+
+ Stream stream = pipe.Reader.AsStream();
+ ValueTask<int> task = stream.ReadAsync(new byte[1024]);
+ Assert.False(task.IsCompleted);
+
+ pipe.Reader.CancelPendingRead();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CanReadAfterCancellingPendingRead()
+ {
+ var pipe = new Pipe();
+
+ Stream stream = pipe.Reader.AsStream();
+ ValueTask<int> task = stream.ReadAsync(new byte[1024]);
+ Assert.False(task.IsCompleted);
+
+ pipe.Reader.CancelPendingRead();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ pipe.Writer.Complete();
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.True(result.IsCompleted);
+
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CancellationTokenFlowsToUnderlyingPipeReader()
+ {
+ var pipe = new Pipe();
+
+ Stream stream = pipe.Reader.AsStream();
+ var cts = new CancellationTokenSource();
+ ValueTask<int> task = stream.ReadAsync(new byte[1024], cts.Token);
+ Assert.False(task.IsCompleted);
+
+ cts.Cancel();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task DefaultPipeReaderImplementationReturnsPipeReaderStream()
+ {
+ var pipeReader = new TestPipeReader();
+ Stream stream = pipeReader.AsStream();
+
+ await stream.ReadAsync(new byte[10]);
+
+ Assert.True(pipeReader.ReadCalled);
+ Assert.True(pipeReader.AdvanceToCalled);
+ }
+
+ [Fact]
+ public void AsStreamReturnsSameInstance()
+ {
+ var pipeReader = new TestPipeReader();
+ Stream stream = pipeReader.AsStream();
+
+ Assert.Same(stream, pipeReader.AsStream());
+ }
+
+ public class BuggyPipeReader : PipeReader
+ {
+ public override void AdvanceTo(SequencePosition consumed)
+ {
+
+ }
+
+ public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+ {
+
+ }
+
+ public override void CancelPendingRead()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Complete(Exception exception = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void OnWriterCompleted(Action<Exception, object> callback, object state)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+ {
+ // Returns a ReadResult with no buffer and with IsCompleted and IsCancelled false
+ return default;
+ }
+
+ public override bool TryRead(out ReadResult result)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public class TestPipeReader : PipeReader
+ {
+ public bool ReadCalled { get; set; }
+ public bool AdvanceToCalled { get; set; }
+
+ public override void AdvanceTo(SequencePosition consumed)
+ {
+ AdvanceToCalled = true;
+ }
+
+ public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void CancelPendingRead()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Complete(Exception exception = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void OnWriterCompleted(Action<Exception, object> callback, object state)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+ {
+ ReadCalled = true;
+ return new ValueTask<ReadResult>(new ReadResult(default, isCanceled: false, isCompleted: true));
+ }
+
+ public override bool TryRead(out ReadResult result)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ public static IEnumerable<object[]> ReadCalls
+ {
+ get
+ {
+ ReadAsyncDelegate readArrayAsync = (stream, data) =>
+ {
+ return stream.ReadAsync(data, 0, data.Length);
+ };
+
+ ReadAsyncDelegate readMemoryAsync = async (stream, data) =>
+ {
+ return await stream.ReadAsync(data);
+ };
+
+ ReadAsyncDelegate readMemoryAsyncWithThreadHop = async (stream, data) =>
+ {
+ await Task.Yield();
+
+ return await stream.ReadAsync(data);
+ };
+
+ ReadAsyncDelegate readArraySync = (stream, data) =>
+ {
+ return Task.FromResult(stream.Read(data, 0, data.Length));
+ };
+
+ ReadAsyncDelegate readSpanSync = (stream, data) =>
+ {
+ return Task.FromResult(stream.Read(data));
+ };
+
+ yield return new object[] { readArrayAsync };
+ yield return new object[] { readMemoryAsync };
+ yield return new object[] { readMemoryAsyncWithThreadHop };
+ yield return new object[] { readArraySync };
+ yield return new object[] { readSpanSync };
+ }
+ }
+ }
+}
--- /dev/null
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class PipeWriterStreamTests
+ {
+ public delegate Task WriteAsyncDelegate(Stream stream, byte[] data);
+
+ [Theory]
+ [MemberData(nameof(WriteCalls))]
+ public async Task WritingToPipeStreamWritesToUnderlyingPipeWriter(WriteAsyncDelegate writeAsync)
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+ var stream = new PipeWriterStream(pipe.Writer);
+
+ await writeAsync(stream, helloBytes);
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.Equal(helloBytes, result.Buffer.ToArray());
+ pipe.Reader.Complete();
+ pipe.Writer.Complete();
+ }
+
+ [Theory]
+ [MemberData(nameof(WriteCalls))]
+ public async Task AsStreamReturnsPipeWriterStream(WriteAsyncDelegate writeAsync)
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+ Stream stream = pipe.Writer.AsStream();
+
+ await writeAsync(stream, helloBytes);
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.Equal(helloBytes, result.Buffer.ToArray());
+ pipe.Reader.Complete();
+ pipe.Writer.Complete();
+ }
+
+ [Fact]
+ public async Task FlushAsyncFlushesBufferedData()
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+
+ Memory<byte> memory = pipe.Writer.GetMemory();
+ helloBytes.CopyTo(memory);
+ pipe.Writer.Advance(helloBytes.Length);
+
+ Stream stream = pipe.Writer.AsStream();
+ await stream.FlushAsync();
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.Equal(helloBytes, result.Buffer.ToArray());
+ pipe.Reader.Complete();
+ pipe.Writer.Complete();
+ }
+
+ [Fact]
+ public async Task ReadingFromPipeWriterStreamThrowsNotSupported()
+ {
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+ var pipe = new Pipe();
+
+ Stream stream = pipe.Writer.AsStream();
+ Assert.True(stream.CanWrite);
+ Assert.False(stream.CanSeek);
+ Assert.False(stream.CanRead);
+ Assert.Throws<NotSupportedException>(() => { long length = stream.Length; });
+ Assert.Throws<NotSupportedException>(() => { long position = stream.Position; });
+ Assert.Throws<NotSupportedException>(() => stream.Seek(0, SeekOrigin.Begin));
+ Assert.Throws<NotSupportedException>(() => stream.Read(new byte[10], 0, 10));
+ await Assert.ThrowsAsync<NotSupportedException>(() => stream.ReadAsync(new byte[10], 0, 10));
+ await Assert.ThrowsAsync<NotSupportedException>(() => stream.ReadAsync(new byte[10]).AsTask());
+ await Assert.ThrowsAsync<NotSupportedException>(() => stream.CopyToAsync(Stream.Null));
+
+ pipe.Reader.Complete();
+ pipe.Writer.Complete();
+ }
+
+ [Fact]
+ public async Task CancellingPendingFlushThrowsOperationCancelledException()
+ {
+ var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 0));
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+
+ Stream stream = pipe.Writer.AsStream();
+ ValueTask task = stream.WriteAsync(helloBytes);
+ Assert.False(task.IsCompleted);
+
+ pipe.Writer.CancelPendingFlush();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CancellationTokenFlowsToUnderlyingPipeWriter()
+ {
+ var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 0));
+ byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+
+ Stream stream = pipe.Writer.AsStream();
+ var cts = new CancellationTokenSource();
+ ValueTask task = stream.WriteAsync(helloBytes, cts.Token);
+ Assert.False(task.IsCompleted);
+
+ cts.Cancel();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task DefaultPipeWriterImplementationReturnsPipeWriterStream()
+ {
+ var pipeWriter = new TestPipeWriter();
+ Stream stream = pipeWriter.AsStream();
+
+ await stream.WriteAsync(new byte[10]);
+
+ Assert.True(pipeWriter.WriteAsyncCalled);
+
+ await stream.FlushAsync();
+
+ Assert.True(pipeWriter.FlushCalled);
+ }
+
+ public class TestPipeWriter : PipeWriter
+ {
+ public bool FlushCalled { get; set; }
+ public bool WriteAsyncCalled { get; set; }
+
+ public override void Advance(int bytes)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void CancelPendingFlush()
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Complete(Exception exception = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
+ {
+ FlushCalled = true;
+ return default;
+ }
+
+ public override Memory<byte> GetMemory(int sizeHint = 0)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override Span<byte> GetSpan(int sizeHint = 0)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void OnReaderCompleted(Action<Exception, object> callback, object state)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
+ {
+ WriteAsyncCalled = true;
+ return default;
+ }
+ }
+
+ public static IEnumerable<object[]> WriteCalls
+ {
+ get
+ {
+ WriteAsyncDelegate writeArrayAsync = (stream, data) =>
+ {
+ return stream.WriteAsync(data, 0, data.Length);
+ };
+
+ WriteAsyncDelegate writeMemoryAsync = async (stream, data) =>
+ {
+ await stream.WriteAsync(data);
+ };
+
+ WriteAsyncDelegate writeArraySync = (stream, data) =>
+ {
+ stream.Write(data, 0, data.Length);
+ return Task.CompletedTask;
+ };
+
+ WriteAsyncDelegate writeSpanSync = (stream, data) =>
+ {
+ stream.Write(data);
+ return Task.CompletedTask;
+ };
+
+ yield return new object[] { writeArrayAsync };
+ yield return new object[] { writeMemoryAsync };
+ yield return new object[] { writeArraySync };
+ yield return new object[] { writeSpanSync };
+ }
+ }
+ }
+}
<Compile Include="PipeReaderWriterFacts.nonnetstandard.cs" />
<Compile Include="PipeResetTests.nonnetstandard.cs" />
<Compile Include="PipePoolTests.nonnetstandard.cs" />
+ <Compile Include="PipeWriterStreamTests.nonnetstandard.cs" />
+ <Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
</ItemGroup>
</Project>
\ No newline at end of file