Added methods to copy to and from streams (dotnet/corefx#35212)
authorDavid Fowler <davidfowl@gmail.com>
Sat, 9 Feb 2019 23:52:29 +0000 (15:52 -0800)
committerGitHub <noreply@github.com>
Sat, 9 Feb 2019 23:52:29 +0000 (15:52 -0800)
- Added Stream.CopyToAsync(PipeWriter)
- Added PipeWriter.CopyFromAsync(Stream)
- Added PipeReader.CopyToAsync(stream)

Commit migrated from https://github.com/dotnet/corefx/commit/603af6f2d84d9970a063bb96fbd4a2461ee3de42

src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamExtensions.netstandard.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeExtensions.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/PipeWriterCopyToAsyncTests.cs [new file with mode: 0644]
src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj

index 4f92884..7382572 100644 (file)
@@ -9,7 +9,7 @@ namespace System.IO.Pipelines
 {
     public partial struct FlushResult
     {
-        private int _dummy;
+        private int _dummyPrimitive;
         public FlushResult(bool isCanceled, bool isCompleted) { throw null; }
         public bool IsCanceled { get { throw null; } }
         public bool IsCompleted { get { throw null; } }
@@ -46,6 +46,7 @@ namespace System.IO.Pipelines
         public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
         public abstract void CancelPendingRead();
         public abstract void Complete(System.Exception exception = null);
+        public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
         public abstract void OnWriterCompleted(System.Action<System.Exception, object> callback, object state);
         public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
         public abstract bool TryRead(out System.IO.Pipelines.ReadResult result);
@@ -63,6 +64,7 @@ namespace System.IO.Pipelines
         public abstract void Advance(int bytes);
         public abstract void CancelPendingFlush();
         public abstract void Complete(System.Exception exception = null);
+        protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
         public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
         public abstract System.Memory<byte> GetMemory(int sizeHint = 0);
         public abstract System.Span<byte> GetSpan(int sizeHint = 0);
@@ -72,9 +74,14 @@ namespace System.IO.Pipelines
     public readonly partial struct ReadResult
     {
         private readonly object _dummy;
+        private readonly int _dummyPrimitive;
         public ReadResult(System.Buffers.ReadOnlySequence<byte> buffer, bool isCanceled, bool isCompleted) { throw null; }
         public System.Buffers.ReadOnlySequence<byte> Buffer { get { throw null; } }
         public bool IsCanceled { get { throw null; } }
         public bool IsCompleted { get { throw null; } }
     }
+    public static partial class StreamPipeExtensions
+    {
+        public static System.Threading.Tasks.Task CopyToAsync(this System.IO.Stream source, System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+    }
 }
index 9843486..2758ac3 100644 (file)
     <Compile Include="System\IO\Pipelines\PipeWriter.cs" />
     <Compile Include="System\IO\Pipelines\ReadResult.cs" />
     <Compile Include="System\IO\Pipelines\ResultFlags.cs" />
+    <Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
     <Compile Include="System\IO\Pipelines\ThrowHelper.cs" />
   </ItemGroup>
   <ItemGroup Condition="'$(TargetGroup)'=='netcoreapp'">
     <Compile Include="System\IO\Pipelines\ThreadPoolScheduler.netcoreapp.cs" />
   </ItemGroup>
   <ItemGroup Condition="'$(TargetGroup)'=='netstandard'">
+    <Compile Include="System\IO\Pipelines\StreamExtensions.netstandard.cs" />
     <Compile Include="System\IO\Pipelines\ThreadPoolScheduler.netstandard.cs" />
     <Compile Include="System\IO\Pipelines\CancellationTokenExtensions.netstandard.cs" />
   </ItemGroup>
@@ -43,6 +45,7 @@
     <Reference Include="System.Runtime.Extensions" />
     <Reference Include="System.Threading" />
     <Reference Include="System.Threading.Tasks.Extensions" />
+    <Reference Include="System.Threading.Tasks" />
     <Reference Include="System.Threading.ThreadPool" />
   </ItemGroup>
 </Project>
\ No newline at end of file
index 9027635..85e4899 100644 (file)
@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.Buffers;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -10,7 +11,7 @@ namespace System.IO.Pipelines
     /// <summary>
     /// Defines a class that provides access to a read side of pipe.
     /// </summary>
-    public abstract class PipeReader
+    public abstract partial class PipeReader
     {
         /// <summary>
         /// Attempt to synchronously read data the <see cref="PipeReader"/>.
@@ -62,5 +63,64 @@ namespace System.IO.Pipelines
         /// Cancel the pending <see cref="ReadAsync"/> operation. If there is none, cancels next <see cref="ReadAsync"/> operation, without completing the <see cref="PipeWriter"/>.
         /// </summary>
         public abstract void OnWriterCompleted(Action<Exception, object> callback, object state);
+
+        /// <summary>
+        /// Asynchronously reads the bytes from the <see cref="PipeReader"/> and writes them to the specified stream, using a specified buffer size and cancellation token.
+        /// </summary>
+        /// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
+        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
+        /// <returns>A task that represents the asynchronous copy operation.</returns>
+        public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
+        {
+            if (destination == null)
+            {
+                throw new ArgumentNullException(nameof(destination));
+            }
+
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return Task.FromCanceled(cancellationToken);
+            }
+
+            return CopyToAsyncCore(destination, cancellationToken);
+        }
+
+        private async Task CopyToAsyncCore(Stream destination, CancellationToken cancellationToken)
+        {
+            while (true)
+            {
+                SequencePosition consumed = default;
+
+                try
+                {
+                    ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false);
+                    ReadOnlySequence<byte> buffer = result.Buffer;
+                    SequencePosition position = buffer.Start;
+
+                    if (result.IsCanceled)
+                    {
+                        throw new OperationCanceledException();
+                    }
+
+                    while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
+                    {
+                        await destination.WriteAsync(memory, cancellationToken).ConfigureAwait(false);
+
+                        consumed = position;
+                    }
+
+                    if (result.IsCompleted)
+                    {
+                        break;
+                    }
+                }
+                finally
+                {
+                    // Advance even if WriteAsync throws so the PipeReader is not left in the
+                    // currently reading state
+                    AdvanceTo(consumed);
+                }
+            }
+        }
     }
 }
index f8d203e..821fa9e 100644 (file)
@@ -11,7 +11,7 @@ namespace System.IO.Pipelines
     /// <summary>
     /// Defines a class that provides a pipeline to which data can be written.
     /// </summary>
-    public abstract class PipeWriter : IBufferWriter<byte>
+    public abstract partial class PipeWriter : IBufferWriter<byte>
     {
         /// <summary>
         /// Marks the <see cref="PipeWriter"/> as being complete, meaning no more items will be written to it.
@@ -51,5 +51,39 @@ namespace System.IO.Pipelines
             this.Write(source.Span);
             return FlushAsync(cancellationToken);
         }
+
+        /// <summary>
+        /// Asynchronously reads the bytes from the specified stream and writes them to the <see cref="PipeWriter"/>.
+        /// </summary>
+        /// <param name="source">The stream from which the contents will be copied.</param>
+        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
+        /// <returns>A task that represents the asynchronous copy operation.</returns>
+        protected internal virtual async Task CopyFromAsync(Stream source, CancellationToken cancellationToken = default)
+        {
+            while (true)
+            {
+                Memory<byte> buffer = GetMemory();
+                int read = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
+
+                if (read == 0)
+                {
+                    break;
+                }
+
+                Advance(read);
+
+                FlushResult result = await FlushAsync(cancellationToken).ConfigureAwait(false);
+
+                if (result.IsCanceled)
+                {
+                    throw new OperationCanceledException();
+                }
+
+                if (result.IsCompleted)
+                {
+                    break;
+                }
+            }
+        }
     }
 }
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamExtensions.netstandard.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamExtensions.netstandard.cs
new file mode 100644 (file)
index 0000000..6d979ee
--- /dev/null
@@ -0,0 +1,71 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+    // Helpers to write Memory<byte> to Stream on netstandard 2.0
+    internal static class StreamExtensions
+    {
+        public static ValueTask<int> ReadAsync(this Stream stream, Memory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
+            {
+                return new ValueTask<int>(stream.ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
+            }
+            else
+            {
+                byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
+                return FinishReadAsync(stream.ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer);
+
+                async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
+                {
+                    try
+                    {
+                        int result = await readTask.ConfigureAwait(false);
+                        new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
+                        return result;
+                    }
+                    finally
+                    {
+                        ArrayPool<byte>.Shared.Return(localBuffer);
+                    }
+                }
+            }
+        }
+
+        public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
+            {
+                return new ValueTask(stream.WriteAsync(array.Array, array.Offset, array.Count, cancellationToken));
+            }
+            else
+            {
+                byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
+                buffer.Span.CopyTo(sharedBuffer);
+                return new ValueTask(FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer));
+            }
+        }
+
+        private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
+        {
+            try
+            {
+                await writeTask.ConfigureAwait(false);
+            }
+            finally
+            {
+                ArrayPool<byte>.Shared.Return(localBuffer);
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeExtensions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeExtensions.cs
new file mode 100644 (file)
index 0000000..5ef1ae3
--- /dev/null
@@ -0,0 +1,35 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+    public static class StreamPipeExtensions
+    {
+        /// <summary>
+        /// Asynchronously reads the bytes from the <see cref="Stream"/> and writes them to the specified <see cref="PipeWriter"/>, using a cancellation token.
+        /// </summary>
+        /// <param name="source">The stream from which the contents of the current stream will be copied.</param>
+        /// <param name="destination">The <see cref="PipeWriter"/> to which the contents of the source stream will be copied.</param>
+        /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
+        /// <returns>A task that represents the asynchronous copy operation.</returns>
+        public static Task CopyToAsync(this Stream source, PipeWriter destination, CancellationToken cancellationToken = default)
+        {
+            if (source == null)
+            {
+                throw new ArgumentNullException(nameof(source));
+            }
+
+            if (destination == null)
+            {
+                throw new ArgumentNullException(nameof(destination));
+            }
+
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return Task.FromCanceled(cancellationToken);
+            }
+
+            return destination.CopyFromAsync(source, cancellationToken);
+        }
+    }
+}
diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs
new file mode 100644 (file)
index 0000000..d33e700
--- /dev/null
@@ -0,0 +1,268 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class CopyToAsyncTests
+    {
+        private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline);
+
+        [Fact]
+        public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination()
+        {
+            var pipe = new Pipe(s_testOptions);
+            var ex = await Assert.ThrowsAsync<ArgumentNullException>(() => pipe.Reader.CopyToAsync(null));
+            Assert.Equal("destination", ex.ParamName);
+        }
+
+        [Fact]
+        public async Task CopyToAsyncThrowsTaskCanceledExceptionForAlreadyCancelledToken()
+        {
+            var pipe = new Pipe(s_testOptions);
+            await Assert.ThrowsAsync<TaskCanceledException>(() => pipe.Reader.CopyToAsync(new MemoryStream(), new CancellationToken(true)));
+        }
+
+        [Fact]
+        public async Task CopyToAsyncWorks()
+        {
+            var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+            var pipe = new Pipe(s_testOptions);
+            await pipe.Writer.WriteAsync(helloBytes);
+            pipe.Writer.Complete();
+
+            var stream = new MemoryStream();
+            await pipe.Reader.CopyToAsync(stream);
+            pipe.Reader.Complete();
+
+            Assert.Equal(helloBytes, stream.ToArray());
+        }
+
+        [Fact]
+        public async Task MultiSegmentWritesWorks()
+        {
+            using (var pool = new TestMemoryPool())
+            {
+                var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline));
+                pipe.Writer.WriteEmpty(4096);
+                pipe.Writer.WriteEmpty(4096);
+                pipe.Writer.WriteEmpty(4096);
+                await pipe.Writer.FlushAsync();
+                pipe.Writer.Complete();
+
+                var stream = new MemoryStream();
+                await pipe.Reader.CopyToAsync(stream);
+                pipe.Reader.Complete();
+
+                Assert.Equal(4096 * 3, stream.Length);
+            }
+        }
+
+        [Fact]
+        public async Task MultiSegmentWritesUntilFailure()
+        {
+            using (var pool = new TestMemoryPool())
+            {
+                var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline));
+                pipe.Writer.WriteEmpty(4096);
+                pipe.Writer.WriteEmpty(4096);
+                pipe.Writer.WriteEmpty(4096);
+                await pipe.Writer.FlushAsync();
+                pipe.Writer.Complete();
+
+                var stream = new ThrowAfterNWritesStream(2);
+                await Assert.ThrowsAsync<InvalidOperationException>(() => pipe.Reader.CopyToAsync(stream));
+
+                ReadResult result = await pipe.Reader.ReadAsync();
+                Assert.Equal(4096, result.Buffer.Length);
+                pipe.Reader.Complete();
+            }
+        }
+
+        [Fact]
+        public async Task EmptyBufferNotWrittenToStream()
+        {
+            var pipe = new Pipe(s_testOptions);
+            pipe.Writer.Complete();
+
+            var stream = new ThrowingStream();
+            await pipe.Reader.CopyToAsync(stream);
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CancelingThePendingReadThrowsOperationCancelledException()
+        {
+            var pipe = new Pipe(s_testOptions);
+            var stream = new MemoryStream();
+            Task task = pipe.Reader.CopyToAsync(stream);
+
+            pipe.Reader.CancelPendingRead();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+        }
+
+        [Fact]
+        public async Task CancelingViaCancellationTokenThrowsOperationCancelledException()
+        {
+            var pipe = new Pipe(s_testOptions);
+            var stream = new MemoryStream();
+            var cts = new CancellationTokenSource();
+            Task task = pipe.Reader.CopyToAsync(stream, cts.Token);
+
+            cts.Cancel();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+        }
+
+        [Fact]
+        public async Task CancelingStreamViaCancellationTokenThrowsOperationCancelledException()
+        {
+            var pipe = new Pipe(s_testOptions);
+            var stream = new CancelledWritesStream();
+            var cts = new CancellationTokenSource();
+            Task task = pipe.Reader.CopyToAsync(stream, cts.Token);
+
+            // Call write async inline, this will yield when it hits the tcs
+            pipe.Writer.WriteEmpty(10);
+            await pipe.Writer.FlushAsync();
+
+            // Then cancel
+            cts.Cancel();
+
+            // Now resume the write which should result in an exception
+            stream.WaitForWriteTask.TrySetResult(null);
+
+            await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+        }
+
+        [Fact]
+        public async Task ThrowingFromStreamDoesNotLeavePipeReaderInBrokenState()
+        {
+            var pipe = new Pipe(s_testOptions);
+            var stream = new ThrowingStream();
+            Task task = pipe.Reader.CopyToAsync(stream);
+
+            pipe.Writer.WriteEmpty(10);
+            await pipe.Writer.FlushAsync();
+
+            await Assert.ThrowsAsync<InvalidOperationException>(() => task);
+
+            pipe.Writer.WriteEmpty(10);
+            await pipe.Writer.FlushAsync();
+            pipe.Writer.Complete();
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+            Assert.True(result.IsCompleted);
+            Assert.Equal(20, result.Buffer.Length);
+            pipe.Reader.Complete();
+        }
+
+        private class CancelledWritesStream : WriteOnlyStream
+        {
+            public TaskCompletionSource<object> WaitForWriteTask = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
+
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                throw new NotImplementedException();
+            }
+
+            public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+            {
+                await WaitForWriteTask.Task;
+
+                cancellationToken.ThrowIfCancellationRequested();
+            }
+
+#if !netstandard
+            public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+            {
+                await WaitForWriteTask.Task;
+
+                cancellationToken.ThrowIfCancellationRequested();
+            }
+#endif
+        }
+        private class ThrowingStream : ThrowAfterNWritesStream
+        {
+            public ThrowingStream() : base(0)
+            {
+            }
+        }
+
+        private class ThrowAfterNWritesStream : WriteOnlyStream
+        {
+            private readonly int _maxWrites;
+            private int _writes;
+
+            public ThrowAfterNWritesStream(int maxWrites)
+            {
+                _maxWrites = maxWrites;
+            }
+
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                throw new InvalidOperationException();
+            }
+
+            public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+            {
+                if (_writes >= _maxWrites)
+                {
+                    throw new InvalidOperationException();
+                }
+                _writes++;
+                return Task.CompletedTask;
+            }
+
+#if !netstandard
+            public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+            {
+                if (_writes >= _maxWrites)
+                {
+                    throw new InvalidOperationException();
+                }
+                _writes++;
+                return default;
+            }
+#endif
+        }
+
+        private abstract class WriteOnlyStream : Stream
+        {
+            public override bool CanRead => false;
+
+            public override bool CanSeek => false;
+
+            public override bool CanWrite => true;
+
+            public override long Length => throw new NotSupportedException();
+
+            public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+            public override void Flush()
+            {
+                throw new InvalidOperationException();
+            }
+
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override long Seek(long offset, SeekOrigin origin)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override void SetLength(long value)
+            {
+                throw new NotSupportedException();
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterCopyToAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterCopyToAsyncTests.cs
new file mode 100644 (file)
index 0000000..dd7b878
--- /dev/null
@@ -0,0 +1,215 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+    public class PipeWriterCopyToAsyncTests
+    {
+        [Fact]
+        public async Task CopyToAsyncThrowsArgumentNullExceptionForNullSource()
+        {
+            var pipe = new Pipe();
+            MemoryStream stream = null;
+            var ex = await Assert.ThrowsAsync<ArgumentNullException>(() => stream.CopyToAsync(pipe.Writer));
+            Assert.Equal("source", ex.ParamName);
+        }
+
+        [Fact]
+        public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination()
+        {
+            var stream = new MemoryStream();
+            var ex = await Assert.ThrowsAsync<ArgumentNullException>(() => stream.CopyToAsync(null));
+            Assert.Equal("destination", ex.ParamName);
+        }
+
+        [Fact]
+        public async Task CopyToAsyncThrowsTaskCanceledExceptionForAlreadyCancelledToken()
+        {
+            var pipe = new Pipe();
+            await Assert.ThrowsAsync<TaskCanceledException>(() => new MemoryStream().CopyToAsync(pipe.Writer, new CancellationToken(true)));
+        }
+
+        [Fact]
+        public async Task CopyToAsyncWorks()
+        {
+            var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+            var pipe = new Pipe();
+            var stream = new MemoryStream(helloBytes);
+            await stream.CopyToAsync(pipe.Writer);
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+
+            Assert.Equal(helloBytes, result.Buffer.ToArray());
+
+            pipe.Reader.AdvanceTo(result.Buffer.End);
+            pipe.Reader.Complete();
+            pipe.Writer.Complete();
+        }
+
+        [Fact]
+        public async Task CopyToAsyncCalledMultipleTimesWorks()
+        {
+            var hello = "Hello World";
+            var helloBytes = Encoding.UTF8.GetBytes(hello);
+            var expected = Encoding.UTF8.GetBytes(hello + hello + hello);
+
+            var pipe = new Pipe();
+            await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer);
+            await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer);
+            await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer);
+            pipe.Writer.Complete();
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+
+            Assert.Equal(expected, result.Buffer.ToArray());
+
+            pipe.Reader.AdvanceTo(result.Buffer.End);
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task StreamCopyToAsyncWorks()
+        {
+            var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+            var pipe = new Pipe();
+            var stream = new MemoryStream(helloBytes);
+            await stream.CopyToAsync(pipe.Writer);
+
+            ReadResult result = await pipe.Reader.ReadAsync();
+
+            Assert.Equal(helloBytes, result.Buffer.ToArray());
+
+            pipe.Reader.AdvanceTo(result.Buffer.End);
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CancelingViaCancelPendingFlushThrows()
+        {
+            var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+            var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: helloBytes.Length - 1, resumeWriterThreshold: 0));
+            var stream = new MemoryStream(helloBytes);
+            Task task = stream.CopyToAsync(pipe.Writer);
+
+            Assert.False(task.IsCompleted);
+
+            pipe.Writer.CancelPendingFlush();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CancelingViaCancellationTokenThrows()
+        {
+            var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+            var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: helloBytes.Length - 1, resumeWriterThreshold: 0));
+            var stream = new MemoryStream(helloBytes);
+            var cts = new CancellationTokenSource();
+            Task task = stream.CopyToAsync(pipe.Writer, cts.Token);
+
+            Assert.False(task.IsCompleted);
+
+            cts.Cancel();
+
+            await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        [Fact]
+        public async Task CancelingStreamViaCancellationTokenThrows()
+        {
+            var pipe = new Pipe();
+            var stream = new CancelledReadsStream();
+            var cts = new CancellationTokenSource();
+            Task task = stream.CopyToAsync(pipe.Writer, cts.Token);
+
+            Assert.False(task.IsCompleted);
+
+            cts.Cancel();
+
+            stream.WaitForReadTask.TrySetResult(null);
+
+            await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+
+            pipe.Writer.Complete();
+            pipe.Reader.Complete();
+        }
+
+        private class CancelledReadsStream : ReadOnlyStream
+        {
+            public TaskCompletionSource<object> WaitForReadTask = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                throw new NotImplementedException();
+            }
+            
+            public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+            {
+                await WaitForReadTask.Task;
+
+                cancellationToken.ThrowIfCancellationRequested();
+
+                return 0;
+            }
+
+#if !netstandard
+            public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+            {
+                await WaitForReadTask.Task;
+
+                cancellationToken.ThrowIfCancellationRequested();
+
+                return 0;
+            }
+#endif
+        }
+
+        private abstract class ReadOnlyStream : Stream
+        {
+            public override bool CanRead => true;
+
+            public override bool CanSeek => false;
+
+            public override bool CanWrite => false;
+
+            public override long Length => throw new NotSupportedException();
+
+            public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+            public override void Flush()
+            {
+                throw new NotSupportedException();
+            }
+
+            public override long Seek(long offset, SeekOrigin origin)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override void SetLength(long value)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                throw new NotSupportedException();
+            }
+        }
+    }
+}
index ac63118..2f6f1e5 100644 (file)
@@ -11,6 +11,7 @@
   </ItemGroup>
   <ItemGroup>
     <Compile Include="BackpressureTests.cs" />
+    <Compile Include="PipeReaderCopyToAsyncTests.cs" />
     <Compile Include="FlushAsyncCancellationTests.cs" />
     <Compile Include="FlushAsyncCompletionTests.cs" />
     <Compile Include="FlushAsyncTests.cs" />
@@ -21,6 +22,7 @@
     <Compile Include="PipePoolTests.cs" />
     <Compile Include="PipeResetTests.cs" />
     <Compile Include="PipeTest.cs" />
+    <Compile Include="PipeWriterCopyToAsyncTests.cs" />
     <Compile Include="PipeWriterTests.cs" />
     <Compile Include="ReadAsyncCancellationTests.cs" />
     <Compile Include="ReadAsyncCompletionTests.cs" />