Added AsStream to PipeReader and PipeWriter (dotnet/corefx#35399)
authorDavid Fowler <davidfowl@gmail.com>
Tue, 19 Feb 2019 22:24:50 +0000 (14:24 -0800)
committerGitHub <noreply@github.com>
Tue, 19 Feb 2019 22:24:50 +0000 (14:24 -0800)
- This adds a new virtual member to PipeReader and PipeWriter to get a read only or write only stream from the PipeReader and PipeWriter
- This introduces a new field on the base types
- Added tests

Commit migrated from https://github.com/dotnet/corefx/commit/968a6a4450ca5acd3aed6a6f6e1f8b445348e729

src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj

index 7382572..e786214 100644 (file)
@@ -44,6 +44,7 @@ namespace System.IO.Pipelines
         protected PipeReader() { }
         public abstract void AdvanceTo(System.SequencePosition consumed);
         public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
+        public virtual System.IO.Stream AsStream() { throw null; }
         public abstract void CancelPendingRead();
         public abstract void Complete(System.Exception exception = null);
         public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -62,6 +63,7 @@ namespace System.IO.Pipelines
     {
         protected PipeWriter() { }
         public abstract void Advance(int bytes);
+        public virtual System.IO.Stream AsStream() { throw null; }
         public abstract void CancelPendingFlush();
         public abstract void Complete(System.Exception exception = null);
         protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
index 7af100a..6657d16 100644 (file)
@@ -1,17 +1,17 @@
 <?xml version="1.0" encoding="utf-8"?>
 <root>
-  <!--
-    Microsoft ResX Schema
-
+  <!-- 
+    Microsoft ResX Schema 
+    
     Version 2.0
-
-    The primary goals of this format is to allow a simple XML format
-    that is mostly human readable. The generation and parsing of the
-    various data types are done through the TypeConverter classes
+    
+    The primary goals of this format is to allow a simple XML format 
+    that is mostly human readable. The generation and parsing of the 
+    various data types are done through the TypeConverter classes 
     associated with the data types.
-
+    
     Example:
-
+    
     ... ado.net/XML headers & schema ...
     <resheader name="resmimetype">text/microsoft-resx</resheader>
     <resheader name="version">2.0</resheader>
         <value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
         <comment>This is a comment</comment>
     </data>
-
-    There are any number of "resheader" rows that contain simple
+                
+    There are any number of "resheader" rows that contain simple 
     name/value pairs.
-
-    Each data row contains a name, and value. The row also contains a
-    type or mimetype. Type corresponds to a .NET class that support
-    text/value conversion through the TypeConverter architecture.
-    Classes that don't support this are serialized and stored with the
+    
+    Each data row contains a name, and value. The row also contains a 
+    type or mimetype. Type corresponds to a .NET class that support 
+    text/value conversion through the TypeConverter architecture. 
+    Classes that don't support this are serialized and stored with the 
     mimetype set.
-
-    The mimetype is used for serialized objects, and tells the
-    ResXResourceReader how to depersist the object. This is currently not
+    
+    The mimetype is used for serialized objects, and tells the 
+    ResXResourceReader how to depersist the object. This is currently not 
     extensible. For a given mimetype the value must be set accordingly:
-
-    Note - application/x-microsoft.net.object.binary.base64 is the format
-    that the ResXResourceWriter will generate, however the reader can
+    
+    Note - application/x-microsoft.net.object.binary.base64 is the format 
+    that the ResXResourceWriter will generate, however the reader can 
     read any of the formats listed below.
-
+    
     mimetype: application/x-microsoft.net.object.binary.base64
-    value   : The object must be serialized with
+    value   : The object must be serialized with 
             : System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
             : and then encoded with base64 encoding.
-
+    
     mimetype: application/x-microsoft.net.object.soap.base64
-    value   : The object must be serialized with
+    value   : The object must be serialized with 
             : System.Runtime.Serialization.Formatters.Soap.SoapFormatter
             : and then encoded with base64 encoding.
 
     mimetype: application/x-microsoft.net.object.bytearray.base64
-    value   : The object must be serialized into a byte array
+    value   : The object must be serialized into a byte array 
             : using a System.ComponentModel.TypeConverter
             : and then encoded with base64 encoding.
     -->
   <data name="ConcurrentOperationsNotSupported" xml:space="preserve">
     <value>Concurrent reads or writes are not supported.</value>
   </data>
+  <data name="FlushCanceledOnPipeWriter" xml:space="preserve">
+    <value>Flush was canceled on underlying PipeWriter.</value>
+  </data>
   <data name="GetResultBeforeCompleted" xml:space="preserve">
     <value>Can't GetResult unless awaiter is completed.</value>
   </data>
+  <data name="InvalidZeroByteRead" xml:space="preserve">
+    <value>The PipeReader returned 0 bytes when the ReadResult was not completed or canceled.</value>
+  </data>
   <data name="NoReadingOperationToComplete" xml:space="preserve">
     <value>No reading operation to complete.</value>
   </data>
   <data name="NoWritingOperation" xml:space="preserve">
     <value>No writing operation. Make sure GetMemory() was called.</value>
   </data>
+  <data name="ReadCanceledOnPipeReader" xml:space="preserve">
+    <value>Read was canceled on underlying PipeReader.</value>
+  </data>
   <data name="ReaderAndWriterHasToBeCompleted" xml:space="preserve">
     <value>Both reader and writer has to be completed to be able to reset the pipe.</value>
   </data>
   <data name="WritingAfterCompleted" xml:space="preserve">
     <value>Writing is not allowed after writer was completed.</value>
   </data>
-</root>
+</root>
\ No newline at end of file
index 2758ac3..4f81ddc 100644 (file)
@@ -4,6 +4,9 @@
     <Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
   </PropertyGroup>
   <ItemGroup>
+    <Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
+      <Link>Common\CoreLib\System\Threading\Tasks\TaskToApm.cs</Link>
+    </Compile>
     <Compile Include="Properties\InternalsVisibleTo.cs" />
     <Compile Include="System\IO\Pipelines\BufferSegment.cs" />
     <Compile Include="System\IO\Pipelines\CompletionData.cs" />
     <Compile Include="System\IO\Pipelines\PipeOptions.cs" />
     <Compile Include="System\IO\Pipelines\PipeReader.cs" />
     <Compile Include="System\IO\Pipelines\PipeOperationState.cs" />
+    <Compile Include="System\IO\Pipelines\PipeReaderStream.cs" />
     <Compile Include="System\IO\Pipelines\PipeScheduler.cs" />
     <Compile Include="System\IO\Pipelines\PipeWriter.cs" />
+    <Compile Include="System\IO\Pipelines\PipeWriterStream.cs" />
     <Compile Include="System\IO\Pipelines\ReadResult.cs" />
     <Compile Include="System\IO\Pipelines\ResultFlags.cs" />
     <Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
index 85e4899..f5b0db7 100644 (file)
@@ -13,6 +13,8 @@ namespace System.IO.Pipelines
     /// </summary>
     public abstract partial class PipeReader
     {
+        private PipeReaderStream _stream;
+
         /// <summary>
         /// Attempt to synchronously read data the <see cref="PipeReader"/>.
         /// </summary>
@@ -49,6 +51,15 @@ namespace System.IO.Pipelines
         public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);
 
         /// <summary>
+        /// 
+        /// </summary>
+        /// <returns></returns>
+        public virtual Stream AsStream()
+        {
+            return _stream ?? (_stream = new PipeReaderStream(this));
+        }
+
+        /// <summary>
         /// Cancel to currently pending or if none is pending next call to <see cref="ReadAsync"/>, without completing the <see cref="PipeReader"/>.
         /// </summary>
         public abstract void CancelPendingRead();
@@ -99,7 +110,7 @@ namespace System.IO.Pipelines
 
                     if (result.IsCanceled)
                     {
-                        throw new OperationCanceledException();
+                        ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
                     }
 
                     while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs
new file mode 100644 (file)
index 0000000..6b449db
--- /dev/null
@@ -0,0 +1,114 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+    internal sealed class PipeReaderStream : Stream
+    {
+        private readonly PipeReader _pipeReader;
+
+        public PipeReaderStream(PipeReader pipeReader)
+        {
+            Debug.Assert(pipeReader != null);
+            _pipeReader = pipeReader;
+        }
+
+        public override bool CanRead => true;
+
+        public override bool CanSeek => false;
+
+        public override bool CanWrite => false;
+
+        public override long Length => throw new NotSupportedException();
+
+        public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+        public override void Flush()
+        {
+        }
+
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+        }
+
+        public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+
+        public override void SetLength(long value) => throw new NotSupportedException();
+
+        public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+        public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+            TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state);
+
+        public sealed override int EndRead(IAsyncResult asyncResult) =>
+            TaskToApm.End<int>(asyncResult);
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
+        }
+
+#if !netstandard
+        public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            return ReadAsyncInternal(buffer, cancellationToken);
+        }
+#endif
+
+        private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
+        {
+            ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false);
+
+            if (result.IsCanceled)
+            {
+                ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
+            }
+
+            ReadOnlySequence<byte> sequence = result.Buffer;
+            long bufferLength = sequence.Length;
+            SequencePosition consumed = sequence.Start;
+
+            try
+            {
+                if (bufferLength != 0)
+                {
+                    int actual = (int)Math.Min(bufferLength, buffer.Length);
+
+                    ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
+                    consumed = slice.End;
+                    slice.CopyTo(buffer.Span);
+
+                    return actual;
+                }
+
+                if (result.IsCompleted)
+                {
+                    return 0;
+                }
+            }
+            finally
+            {
+                _pipeReader.AdvanceTo(consumed);
+            }
+
+            // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
+            // isn't completed or canceled
+            ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
+            return 0;
+        }
+
+        public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+        {
+            // Delegate to CopyToAsync on the PipeReader
+            return _pipeReader.CopyToAsync(destination, cancellationToken);
+        }
+    }
+}
index 821fa9e..036ac1b 100644 (file)
@@ -13,6 +13,8 @@ namespace System.IO.Pipelines
     /// </summary>
     public abstract partial class PipeWriter : IBufferWriter<byte>
     {
+        private PipeWriterStream _stream;
+
         /// <summary>
         /// Marks the <see cref="PipeWriter"/> as being complete, meaning no more items will be written to it.
         /// </summary>
@@ -44,6 +46,15 @@ namespace System.IO.Pipelines
         public abstract Span<byte> GetSpan(int sizeHint = 0);
 
         /// <summary>
+        /// 
+        /// </summary>
+        /// <returns></returns>
+        public virtual Stream AsStream()
+        {
+            return _stream ?? (_stream = new PipeWriterStream(this));
+        }
+
+        /// <summary>
         /// Writes <paramref name="source"/> to the pipe and makes data accessible to <see cref="PipeReader"/>
         /// </summary>
         public virtual ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
@@ -76,7 +87,7 @@ namespace System.IO.Pipelines
 
                 if (result.IsCanceled)
                 {
-                    throw new OperationCanceledException();
+                    ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
                 }
 
                 if (result.IsCompleted)
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs
new file mode 100644 (file)
index 0000000..cdab755
--- /dev/null
@@ -0,0 +1,103 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+    internal sealed class PipeWriterStream : Stream
+    {
+        private readonly PipeWriter _pipeWriter;
+
+        public PipeWriterStream(PipeWriter pipeWriter)
+        {
+            Debug.Assert(pipeWriter != null);
+            _pipeWriter = pipeWriter;
+        }
+
+        public override bool CanRead => false;
+
+        public override bool CanSeek => false;
+
+        public override bool CanWrite => true;
+
+        public override long Length => throw new NotSupportedException();
+
+        public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+        public override void Flush()
+        {
+            FlushAsync().GetAwaiter().GetResult();
+        }
+
+        public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+        public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+
+        public override void SetLength(long value) => throw new NotSupportedException();
+
+        public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
+            TaskToApm.Begin(WriteAsync(buffer, offset, count, default), callback, state);
+
+        public sealed override void EndWrite(IAsyncResult asyncResult) =>
+            TaskToApm.End(asyncResult);
+
+        public override void Write(byte[] buffer, int offset, int count)
+        {
+            WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+        }
+
+        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            ValueTask<FlushResult> valueTask = _pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken);
+
+            return GetFlushResultAsTask(valueTask);
+        }
+
+#if !netstandard
+        public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            ValueTask<FlushResult> valueTask = _pipeWriter.WriteAsync(buffer, cancellationToken);
+
+            return new ValueTask(GetFlushResultAsTask(valueTask));
+        }
+#endif
+
+        public override Task FlushAsync(CancellationToken cancellationToken)
+        {
+            ValueTask<FlushResult> valueTask = _pipeWriter.FlushAsync(cancellationToken);
+
+            return GetFlushResultAsTask(valueTask);
+        }
+
+        private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask)
+        {
+            if (valueTask.IsCompletedSuccessfully)
+            {
+                FlushResult result = valueTask.Result;
+                if (result.IsCanceled)
+                {
+                    ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
+                }
+
+                return Task.CompletedTask;
+            }
+
+            static async Task AwaitTask(ValueTask<FlushResult> valueTask)
+            {
+                FlushResult result = await valueTask.ConfigureAwait(false);
+
+                if (result.IsCanceled)
+                {
+                    ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
+                }
+            }
+
+            return AwaitTask(valueTask);
+        }
+    }
+}
+
index 5b5b51d..c9bcff4 100644 (file)
@@ -60,6 +60,18 @@ namespace System.IO.Pipelines
         public static void ThrowInvalidOperationException_ResetIncompleteReaderWriter() => throw CreateInvalidOperationException_ResetIncompleteReaderWriter();
         [MethodImpl(MethodImplOptions.NoInlining)]
         public static Exception CreateInvalidOperationException_ResetIncompleteReaderWriter() => new InvalidOperationException(SR.ReaderAndWriterHasToBeCompleted);
+
+        public static void ThrowOperationCanceledException_ReadCanceled() => throw CreateOperationCanceledException_ReadCanceled();
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public static Exception CreateOperationCanceledException_ReadCanceled() => new OperationCanceledException(SR.ReadCanceledOnPipeReader);
+
+        public static void ThrowOperationCanceledException_FlushCanceled() => throw CreateOperationCanceledException_FlushCanceled();
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public static Exception CreateOperationCanceledException_FlushCanceled() => new OperationCanceledException(SR.FlushCanceledOnPipeWriter);
+
+        public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
+        [MethodImpl(MethodImplOptions.NoInlining)]
+        public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);
     }
 
     internal enum ExceptionArgument
diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs
new file mode 100644 (file)
index 0000000..85491b2
--- /dev/null
@@ -0,0 +1,328 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class PipeReaderStreamTests
+    {
+        public delegate Task<int> ReadAsyncDelegate(Stream stream, byte[] data);
+
+        [Theory]
+        [MemberData(nameof(ReadCalls))]
+        public async Task ReadingFromPipeReaderStreamReadsFromUnderlyingPipeReader(ReadAsyncDelegate readAsync)
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+            await pipe.Writer.WriteAsync(helloBytes);
+            pipe.Writer.Complete();
+
+            var stream = new PipeReaderStream(pipe.Reader);
+
+            var buffer = new byte[1024];
+            int read = await readAsync(stream, buffer);
+
+            Assert.Equal(helloBytes, buffer.AsSpan(0, read).ToArray());
+            pipe.Reader.Complete();
+        }
+
+        [Theory]
+        [MemberData(nameof(ReadCalls))]
+        public async Task AsStreamReturnsPipeReaderStream(ReadAsyncDelegate readAsync)
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+            await pipe.Writer.WriteAsync(helloBytes);
+            pipe.Writer.Complete();
+
+            Stream stream = pipe.Reader.AsStream();
+
+            var buffer = new byte[1024];
+            int read = await readAsync(stream, buffer);
+
+            Assert.Equal(helloBytes, buffer.AsSpan(0, read).ToArray());
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task ReadingWithSmallerBufferWorks()
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+            await pipe.Writer.WriteAsync(helloBytes);
+            pipe.Writer.Complete();
+
+            Stream stream = pipe.Reader.AsStream();
+
+            var buffer = new byte[5];
+            int read = await stream.ReadAsync(buffer);
+
+            Assert.Equal(5, read);
+            Assert.Equal(helloBytes.AsSpan(0, 5).ToArray(), buffer);
+
+            buffer = new byte[3];
+            read = await stream.ReadAsync(buffer);
+
+            Assert.Equal(3, read);
+            Assert.Equal(helloBytes.AsSpan(5, 3).ToArray(), buffer);
+
+            // Verify that the buffer is partially consumed and we can read the rest from the PipeReader directly
+            ReadResult result = await pipe.Reader.ReadAsync();
+            Assert.Equal(helloBytes.AsSpan(8).ToArray(), result.Buffer.ToArray());
+            pipe.Reader.AdvanceTo(result.Buffer.End);
+
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task EndOfPipeReaderReturnsZeroBytesFromReadAsync()
+        {
+            var pipe = new Pipe();
+            Memory<byte> memory = pipe.Writer.GetMemory();
+            pipe.Writer.Advance(5);
+            pipe.Writer.Complete();
+
+            Stream stream = pipe.Reader.AsStream();
+
+            var buffer = new byte[5];
+            var read = await stream.ReadAsync(buffer);
+
+            Assert.Equal(5, read);
+
+            read = await stream.ReadAsync(buffer);
+
+            // Read again to make sure it always returns 0
+            Assert.Equal(0, read);
+
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task BuggyPipeReaderImplementationThrows()
+        {
+            var pipeReader = new BuggyPipeReader();
+            
+            Stream stream = pipeReader.AsStream();
+
+            await Assert.ThrowsAsync<InvalidOperationException>(async () => await stream.ReadAsync(new byte[5]));
+        }
+
+        [Fact]
+        public async Task WritingToPipeReaderStreamThrowsNotSupported()
+        {
+            var pipe = new Pipe();
+
+            Stream stream = pipe.Reader.AsStream();
+            Assert.False(stream.CanWrite);
+            Assert.False(stream.CanSeek);
+            Assert.True(stream.CanRead);
+            Assert.Throws<NotSupportedException>(() => { long length = stream.Length; });
+            Assert.Throws<NotSupportedException>(() => { long position = stream.Position; });
+            Assert.Throws<NotSupportedException>(() => stream.Seek(0, SeekOrigin.Begin));
+            Assert.Throws<NotSupportedException>(() => stream.Write(new byte[10], 0, 10));
+            await Assert.ThrowsAsync<NotSupportedException>(() => stream.WriteAsync(new byte[10], 0, 10));
+            await Assert.ThrowsAsync<NotSupportedException>(() => stream.WriteAsync(new byte[10]).AsTask());
+
+            pipe.Reader.Complete();
+            pipe.Writer.Complete();
+        }
+
+        [Fact]
+        public async Task CancellingPendingReadThrowsOperationCancelledException()
+        {
+            var pipe = new Pipe();
+
+            Stream stream = pipe.Reader.AsStream();
+            ValueTask<int> task = stream.ReadAsync(new byte[1024]);
+            Assert.False(task.IsCompleted);
+
+            pipe.Reader.CancelPendingRead();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CanReadAfterCancellingPendingRead()
+        {
+            var pipe = new Pipe();
+
+            Stream stream = pipe.Reader.AsStream();
+            ValueTask<int> task = stream.ReadAsync(new byte[1024]);
+            Assert.False(task.IsCompleted);
+
+            pipe.Reader.CancelPendingRead();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+            pipe.Writer.Complete();
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+            Assert.True(result.IsCompleted);
+
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CancellationTokenFlowsToUnderlyingPipeReader()
+        {
+            var pipe = new Pipe();
+
+            Stream stream = pipe.Reader.AsStream();
+            var cts = new CancellationTokenSource();
+            ValueTask<int> task = stream.ReadAsync(new byte[1024], cts.Token);
+            Assert.False(task.IsCompleted);
+
+            cts.Cancel();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task DefaultPipeReaderImplementationReturnsPipeReaderStream()
+        {
+            var pipeReader = new TestPipeReader();
+            Stream stream = pipeReader.AsStream();
+
+            await stream.ReadAsync(new byte[10]);
+
+            Assert.True(pipeReader.ReadCalled);
+            Assert.True(pipeReader.AdvanceToCalled);
+        }
+
+        [Fact]
+        public void AsStreamReturnsSameInstance()
+        {
+            var pipeReader = new TestPipeReader();
+            Stream stream = pipeReader.AsStream();
+
+            Assert.Same(stream, pipeReader.AsStream());
+        }
+
+        public class BuggyPipeReader : PipeReader
+        {
+            public override void AdvanceTo(SequencePosition consumed)
+            {
+                
+            }
+
+            public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+            {
+                
+            }
+
+            public override void CancelPendingRead()
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void Complete(Exception exception = null)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void OnWriterCompleted(Action<Exception, object> callback, object state)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+            {
+                // Returns a ReadResult with no buffer and with IsCompleted and IsCancelled false
+                return default;
+            }
+
+            public override bool TryRead(out ReadResult result)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        public class TestPipeReader : PipeReader
+        {
+            public bool ReadCalled { get; set; }
+            public bool AdvanceToCalled { get; set; }
+
+            public override void AdvanceTo(SequencePosition consumed)
+            {
+                AdvanceToCalled = true;
+            }
+
+            public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void CancelPendingRead()
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void Complete(Exception exception = null)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void OnWriterCompleted(Action<Exception, object> callback, object state)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+            {
+                ReadCalled = true;
+                return new ValueTask<ReadResult>(new ReadResult(default, isCanceled: false, isCompleted: true));
+            }
+
+            public override bool TryRead(out ReadResult result)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        public static IEnumerable<object[]> ReadCalls
+        {
+            get
+            {
+                ReadAsyncDelegate readArrayAsync = (stream, data) =>
+                {
+                    return stream.ReadAsync(data, 0, data.Length);
+                };
+
+                ReadAsyncDelegate readMemoryAsync = async (stream, data) =>
+                {
+                    return await stream.ReadAsync(data);
+                };
+
+                ReadAsyncDelegate readMemoryAsyncWithThreadHop = async (stream, data) =>
+                {
+                    await Task.Yield();
+
+                    return await stream.ReadAsync(data);
+                };
+
+                ReadAsyncDelegate readArraySync = (stream, data) =>
+                {
+                    return Task.FromResult(stream.Read(data, 0, data.Length));
+                };
+
+                ReadAsyncDelegate readSpanSync = (stream, data) =>
+                {
+                    return Task.FromResult(stream.Read(data));
+                };
+
+                yield return new object[] { readArrayAsync };
+                yield return new object[] { readMemoryAsync };
+                yield return new object[] { readMemoryAsyncWithThreadHop };
+                yield return new object[] { readArraySync };
+                yield return new object[] { readSpanSync };
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs
new file mode 100644 (file)
index 0000000..9d66340
--- /dev/null
@@ -0,0 +1,219 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class PipeWriterStreamTests
+    {
+        public delegate Task WriteAsyncDelegate(Stream stream, byte[] data);
+
+        [Theory]
+        [MemberData(nameof(WriteCalls))]
+        public async Task WritingToPipeStreamWritesToUnderlyingPipeWriter(WriteAsyncDelegate writeAsync)
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+            var stream = new PipeWriterStream(pipe.Writer);
+
+            await writeAsync(stream, helloBytes);
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+            Assert.Equal(helloBytes, result.Buffer.ToArray());
+            pipe.Reader.Complete();
+            pipe.Writer.Complete();
+        }
+
+        [Theory]
+        [MemberData(nameof(WriteCalls))]
+        public async Task AsStreamReturnsPipeWriterStream(WriteAsyncDelegate writeAsync)
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+            Stream stream = pipe.Writer.AsStream();
+
+            await writeAsync(stream, helloBytes);
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+            Assert.Equal(helloBytes, result.Buffer.ToArray());
+            pipe.Reader.Complete();
+            pipe.Writer.Complete();
+        }
+
+        [Fact]
+        public async Task FlushAsyncFlushesBufferedData()
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+
+            Memory<byte> memory = pipe.Writer.GetMemory();
+            helloBytes.CopyTo(memory);
+            pipe.Writer.Advance(helloBytes.Length);
+
+            Stream stream = pipe.Writer.AsStream();
+            await stream.FlushAsync();
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+            Assert.Equal(helloBytes, result.Buffer.ToArray());
+            pipe.Reader.Complete();
+            pipe.Writer.Complete();
+        }
+
+        [Fact]
+        public async Task ReadingFromPipeWriterStreamThrowsNotSupported()
+        {
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+            var pipe = new Pipe();
+
+            Stream stream = pipe.Writer.AsStream();
+            Assert.True(stream.CanWrite);
+            Assert.False(stream.CanSeek);
+            Assert.False(stream.CanRead);
+            Assert.Throws<NotSupportedException>(() => { long length = stream.Length; });
+            Assert.Throws<NotSupportedException>(() => { long position = stream.Position; });
+            Assert.Throws<NotSupportedException>(() => stream.Seek(0, SeekOrigin.Begin));
+            Assert.Throws<NotSupportedException>(() => stream.Read(new byte[10], 0, 10));
+            await Assert.ThrowsAsync<NotSupportedException>(() => stream.ReadAsync(new byte[10], 0, 10));
+            await Assert.ThrowsAsync<NotSupportedException>(() => stream.ReadAsync(new byte[10]).AsTask());
+            await Assert.ThrowsAsync<NotSupportedException>(() => stream.CopyToAsync(Stream.Null));
+
+            pipe.Reader.Complete();
+            pipe.Writer.Complete();
+        }
+
+        [Fact]
+        public async Task CancellingPendingFlushThrowsOperationCancelledException()
+        {
+            var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 0));
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+
+            Stream stream = pipe.Writer.AsStream();
+            ValueTask task = stream.WriteAsync(helloBytes);
+            Assert.False(task.IsCompleted);
+
+            pipe.Writer.CancelPendingFlush();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CancellationTokenFlowsToUnderlyingPipeWriter()
+        {
+            var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 0));
+            byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World");
+
+            Stream stream = pipe.Writer.AsStream();
+            var cts = new CancellationTokenSource();
+            ValueTask task = stream.WriteAsync(helloBytes, cts.Token);
+            Assert.False(task.IsCompleted);
+
+            cts.Cancel();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task DefaultPipeWriterImplementationReturnsPipeWriterStream()
+        {
+            var pipeWriter = new TestPipeWriter();
+            Stream stream = pipeWriter.AsStream();
+
+            await stream.WriteAsync(new byte[10]);
+
+            Assert.True(pipeWriter.WriteAsyncCalled);
+
+            await stream.FlushAsync();
+
+            Assert.True(pipeWriter.FlushCalled);
+        }
+
+        public class TestPipeWriter : PipeWriter
+        {
+            public bool FlushCalled { get; set; }
+            public bool WriteAsyncCalled { get; set; }
+
+            public override void Advance(int bytes)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void CancelPendingFlush()
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void Complete(Exception exception = null)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
+            {
+                FlushCalled = true;
+                return default;
+            }
+
+            public override Memory<byte> GetMemory(int sizeHint = 0)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override Span<byte> GetSpan(int sizeHint = 0)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override void OnReaderCompleted(Action<Exception, object> callback, object state)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
+            {
+                WriteAsyncCalled = true;
+                return default;
+            }
+        }
+
+        public static IEnumerable<object[]> WriteCalls
+        {
+            get
+            {
+                WriteAsyncDelegate writeArrayAsync = (stream, data) =>
+                {
+                    return stream.WriteAsync(data, 0, data.Length);
+                };
+
+                WriteAsyncDelegate writeMemoryAsync = async (stream, data) =>
+                {
+                    await stream.WriteAsync(data);
+                };
+
+                WriteAsyncDelegate writeArraySync = (stream, data) =>
+                {
+                    stream.Write(data, 0, data.Length);
+                    return Task.CompletedTask;
+                };
+
+                WriteAsyncDelegate writeSpanSync = (stream, data) =>
+                {
+                    stream.Write(data);
+                    return Task.CompletedTask;
+                };
+
+                yield return new object[] { writeArrayAsync };
+                yield return new object[] { writeMemoryAsync };
+                yield return new object[] { writeArraySync };
+                yield return new object[] { writeSpanSync };
+            }
+        }
+    }
+}
index 2f6f1e5..da672ba 100644 (file)
@@ -35,5 +35,7 @@
     <Compile Include="PipeReaderWriterFacts.nonnetstandard.cs" />
     <Compile Include="PipeResetTests.nonnetstandard.cs" />
     <Compile Include="PipePoolTests.nonnetstandard.cs" />
+    <Compile Include="PipeWriterStreamTests.nonnetstandard.cs" />
+    <Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
   </ItemGroup>
 </Project>
\ No newline at end of file