From: David Fowler Date: Sat, 9 Feb 2019 23:52:29 +0000 (-0800) Subject: Added methods to copy to and from streams (dotnet/corefx#35212) X-Git-Tag: submit/tizen/20210909.063632~11031^2~2494 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=fe1157edf254076052379da0660967486433c6c5;p=platform%2Fupstream%2Fdotnet%2Fruntime.git Added methods to copy to and from streams (dotnet/corefx#35212) - Added Stream.CopyToAsync(PipeWriter) - Added PipeWriter.CopyFromAsync(Stream) - Added PipeReader.CopyToAsync(stream) Commit migrated from https://github.com/dotnet/corefx/commit/603af6f2d84d9970a063bb96fbd4a2461ee3de42 --- diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs index 4f92884..7382572 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs @@ -9,7 +9,7 @@ namespace System.IO.Pipelines { public partial struct FlushResult { - private int _dummy; + private int _dummyPrimitive; public FlushResult(bool isCanceled, bool isCompleted) { throw null; } public bool IsCanceled { get { throw null; } } public bool IsCompleted { get { throw null; } } @@ -46,6 +46,7 @@ namespace System.IO.Pipelines public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined); public abstract void CancelPendingRead(); public abstract void Complete(System.Exception exception = null); + public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public abstract void OnWriterCompleted(System.Action callback, object state); public abstract System.Threading.Tasks.ValueTask ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); public abstract bool TryRead(out System.IO.Pipelines.ReadResult result); @@ -63,6 +64,7 @@ namespace System.IO.Pipelines public abstract void Advance(int bytes); public abstract void CancelPendingFlush(); public abstract void Complete(System.Exception exception = null); + protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public abstract System.Threading.Tasks.ValueTask FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); public abstract System.Memory GetMemory(int sizeHint = 0); public abstract System.Span GetSpan(int sizeHint = 0); @@ -72,9 +74,14 @@ namespace System.IO.Pipelines public readonly partial struct ReadResult { private readonly object _dummy; + private readonly int _dummyPrimitive; public ReadResult(System.Buffers.ReadOnlySequence buffer, bool isCanceled, bool isCompleted) { throw null; } public System.Buffers.ReadOnlySequence Buffer { get { throw null; } } public bool IsCanceled { get { throw null; } } public bool IsCompleted { get { throw null; } } } + public static partial class StreamPipeExtensions + { + public static System.Threading.Tasks.Task CopyToAsync(this System.IO.Stream source, System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } } diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index 9843486..2758ac3 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -24,12 +24,14 @@ + + @@ -43,6 +45,7 @@ + \ No newline at end of file diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs index 9027635..85e4899 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Buffers; using System.Threading; using System.Threading.Tasks; @@ -10,7 +11,7 @@ namespace System.IO.Pipelines /// /// Defines a class that provides access to a read side of pipe. /// - public abstract class PipeReader + public abstract partial class PipeReader { /// /// Attempt to synchronously read data the . @@ -62,5 +63,64 @@ namespace System.IO.Pipelines /// Cancel the pending operation. If there is none, cancels next operation, without completing the . /// public abstract void OnWriterCompleted(Action callback, object state); + + /// + /// Asynchronously reads the bytes from the and writes them to the specified stream, using a specified buffer size and cancellation token. + /// + /// The stream to which the contents of the current stream will be copied. + /// The token to monitor for cancellation requests. The default value is . + /// A task that represents the asynchronous copy operation. + public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default) + { + if (destination == null) + { + throw new ArgumentNullException(nameof(destination)); + } + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + return CopyToAsyncCore(destination, cancellationToken); + } + + private async Task CopyToAsyncCore(Stream destination, CancellationToken cancellationToken) + { + while (true) + { + SequencePosition consumed = default; + + try + { + ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false); + ReadOnlySequence buffer = result.Buffer; + SequencePosition position = buffer.Start; + + if (result.IsCanceled) + { + throw new OperationCanceledException(); + } + + while (buffer.TryGet(ref position, out ReadOnlyMemory memory)) + { + await destination.WriteAsync(memory, cancellationToken).ConfigureAwait(false); + + consumed = position; + } + + if (result.IsCompleted) + { + break; + } + } + finally + { + // Advance even if WriteAsync throws so the PipeReader is not left in the + // currently reading state + AdvanceTo(consumed); + } + } + } } } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs index f8d203e..821fa9e 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs @@ -11,7 +11,7 @@ namespace System.IO.Pipelines /// /// Defines a class that provides a pipeline to which data can be written. /// - public abstract class PipeWriter : IBufferWriter + public abstract partial class PipeWriter : IBufferWriter { /// /// Marks the as being complete, meaning no more items will be written to it. @@ -51,5 +51,39 @@ namespace System.IO.Pipelines this.Write(source.Span); return FlushAsync(cancellationToken); } + + /// + /// Asynchronously reads the bytes from the specified stream and writes them to the . + /// + /// The stream from which the contents will be copied. + /// The token to monitor for cancellation requests. The default value is . + /// A task that represents the asynchronous copy operation. + protected internal virtual async Task CopyFromAsync(Stream source, CancellationToken cancellationToken = default) + { + while (true) + { + Memory buffer = GetMemory(); + int read = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + + if (read == 0) + { + break; + } + + Advance(read); + + FlushResult result = await FlushAsync(cancellationToken).ConfigureAwait(false); + + if (result.IsCanceled) + { + throw new OperationCanceledException(); + } + + if (result.IsCompleted) + { + break; + } + } + } } } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamExtensions.netstandard.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamExtensions.netstandard.cs new file mode 100644 index 0000000..6d979ee --- /dev/null +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamExtensions.netstandard.cs @@ -0,0 +1,71 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + // Helpers to write Memory to Stream on netstandard 2.0 + internal static class StreamExtensions + { + public static ValueTask ReadAsync(this Stream stream, Memory buffer, CancellationToken cancellationToken = default) + { + if (MemoryMarshal.TryGetArray(buffer, out ArraySegment array)) + { + return new ValueTask(stream.ReadAsync(array.Array, array.Offset, array.Count, cancellationToken)); + } + else + { + byte[] sharedBuffer = ArrayPool.Shared.Rent(buffer.Length); + return FinishReadAsync(stream.ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer); + + async ValueTask FinishReadAsync(Task readTask, byte[] localBuffer, Memory localDestination) + { + try + { + int result = await readTask.ConfigureAwait(false); + new Span(localBuffer, 0, result).CopyTo(localDestination.Span); + return result; + } + finally + { + ArrayPool.Shared.Return(localBuffer); + } + } + } + } + + public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + if (MemoryMarshal.TryGetArray(buffer, out ArraySegment array)) + { + return new ValueTask(stream.WriteAsync(array.Array, array.Offset, array.Count, cancellationToken)); + } + else + { + byte[] sharedBuffer = ArrayPool.Shared.Rent(buffer.Length); + buffer.Span.CopyTo(sharedBuffer); + return new ValueTask(FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer)); + } + } + + private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer) + { + try + { + await writeTask.ConfigureAwait(false); + } + finally + { + ArrayPool.Shared.Return(localBuffer); + } + } + } +} diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeExtensions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeExtensions.cs new file mode 100644 index 0000000..5ef1ae3 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeExtensions.cs @@ -0,0 +1,35 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + public static class StreamPipeExtensions + { + /// + /// Asynchronously reads the bytes from the and writes them to the specified , using a cancellation token. + /// + /// The stream from which the contents of the current stream will be copied. + /// The to which the contents of the source stream will be copied. + /// The token to monitor for cancellation requests. The default value is . + /// A task that represents the asynchronous copy operation. + public static Task CopyToAsync(this Stream source, PipeWriter destination, CancellationToken cancellationToken = default) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (destination == null) + { + throw new ArgumentNullException(nameof(destination)); + } + + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + return destination.CopyFromAsync(source, cancellationToken); + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs new file mode 100644 index 0000000..d33e700 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderCopyToAsyncTests.cs @@ -0,0 +1,268 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class CopyToAsyncTests + { + private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline); + + [Fact] + public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination() + { + var pipe = new Pipe(s_testOptions); + var ex = await Assert.ThrowsAsync(() => pipe.Reader.CopyToAsync(null)); + Assert.Equal("destination", ex.ParamName); + } + + [Fact] + public async Task CopyToAsyncThrowsTaskCanceledExceptionForAlreadyCancelledToken() + { + var pipe = new Pipe(s_testOptions); + await Assert.ThrowsAsync(() => pipe.Reader.CopyToAsync(new MemoryStream(), new CancellationToken(true))); + } + + [Fact] + public async Task CopyToAsyncWorks() + { + var helloBytes = Encoding.UTF8.GetBytes("Hello World"); + + var pipe = new Pipe(s_testOptions); + await pipe.Writer.WriteAsync(helloBytes); + pipe.Writer.Complete(); + + var stream = new MemoryStream(); + await pipe.Reader.CopyToAsync(stream); + pipe.Reader.Complete(); + + Assert.Equal(helloBytes, stream.ToArray()); + } + + [Fact] + public async Task MultiSegmentWritesWorks() + { + using (var pool = new TestMemoryPool()) + { + var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline)); + pipe.Writer.WriteEmpty(4096); + pipe.Writer.WriteEmpty(4096); + pipe.Writer.WriteEmpty(4096); + await pipe.Writer.FlushAsync(); + pipe.Writer.Complete(); + + var stream = new MemoryStream(); + await pipe.Reader.CopyToAsync(stream); + pipe.Reader.Complete(); + + Assert.Equal(4096 * 3, stream.Length); + } + } + + [Fact] + public async Task MultiSegmentWritesUntilFailure() + { + using (var pool = new TestMemoryPool()) + { + var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline)); + pipe.Writer.WriteEmpty(4096); + pipe.Writer.WriteEmpty(4096); + pipe.Writer.WriteEmpty(4096); + await pipe.Writer.FlushAsync(); + pipe.Writer.Complete(); + + var stream = new ThrowAfterNWritesStream(2); + await Assert.ThrowsAsync(() => pipe.Reader.CopyToAsync(stream)); + + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.Equal(4096, result.Buffer.Length); + pipe.Reader.Complete(); + } + } + + [Fact] + public async Task EmptyBufferNotWrittenToStream() + { + var pipe = new Pipe(s_testOptions); + pipe.Writer.Complete(); + + var stream = new ThrowingStream(); + await pipe.Reader.CopyToAsync(stream); + pipe.Reader.Complete(); + } + + [Fact] + public async Task CancelingThePendingReadThrowsOperationCancelledException() + { + var pipe = new Pipe(s_testOptions); + var stream = new MemoryStream(); + Task task = pipe.Reader.CopyToAsync(stream); + + pipe.Reader.CancelPendingRead(); + + await Assert.ThrowsAsync(() => task); + } + + [Fact] + public async Task CancelingViaCancellationTokenThrowsOperationCancelledException() + { + var pipe = new Pipe(s_testOptions); + var stream = new MemoryStream(); + var cts = new CancellationTokenSource(); + Task task = pipe.Reader.CopyToAsync(stream, cts.Token); + + cts.Cancel(); + + await Assert.ThrowsAsync(() => task); + } + + [Fact] + public async Task CancelingStreamViaCancellationTokenThrowsOperationCancelledException() + { + var pipe = new Pipe(s_testOptions); + var stream = new CancelledWritesStream(); + var cts = new CancellationTokenSource(); + Task task = pipe.Reader.CopyToAsync(stream, cts.Token); + + // Call write async inline, this will yield when it hits the tcs + pipe.Writer.WriteEmpty(10); + await pipe.Writer.FlushAsync(); + + // Then cancel + cts.Cancel(); + + // Now resume the write which should result in an exception + stream.WaitForWriteTask.TrySetResult(null); + + await Assert.ThrowsAsync(() => task); + } + + [Fact] + public async Task ThrowingFromStreamDoesNotLeavePipeReaderInBrokenState() + { + var pipe = new Pipe(s_testOptions); + var stream = new ThrowingStream(); + Task task = pipe.Reader.CopyToAsync(stream); + + pipe.Writer.WriteEmpty(10); + await pipe.Writer.FlushAsync(); + + await Assert.ThrowsAsync(() => task); + + pipe.Writer.WriteEmpty(10); + await pipe.Writer.FlushAsync(); + pipe.Writer.Complete(); + + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.True(result.IsCompleted); + Assert.Equal(20, result.Buffer.Length); + pipe.Reader.Complete(); + } + + private class CancelledWritesStream : WriteOnlyStream + { + public TaskCompletionSource WaitForWriteTask = new TaskCompletionSource(TaskContinuationOptions.RunContinuationsAsynchronously); + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await WaitForWriteTask.Task; + + cancellationToken.ThrowIfCancellationRequested(); + } + +#if !netstandard + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await WaitForWriteTask.Task; + + cancellationToken.ThrowIfCancellationRequested(); + } +#endif + } + private class ThrowingStream : ThrowAfterNWritesStream + { + public ThrowingStream() : base(0) + { + } + } + + private class ThrowAfterNWritesStream : WriteOnlyStream + { + private readonly int _maxWrites; + private int _writes; + + public ThrowAfterNWritesStream(int maxWrites) + { + _maxWrites = maxWrites; + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new InvalidOperationException(); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (_writes >= _maxWrites) + { + throw new InvalidOperationException(); + } + _writes++; + return Task.CompletedTask; + } + +#if !netstandard + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + if (_writes >= _maxWrites) + { + throw new InvalidOperationException(); + } + _writes++; + return default; + } +#endif + } + + private abstract class WriteOnlyStream : Stream + { + public override bool CanRead => false; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotSupportedException(); + + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + public override void Flush() + { + throw new InvalidOperationException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterCopyToAsyncTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterCopyToAsyncTests.cs new file mode 100644 index 0000000..dd7b878 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/tests/PipeWriterCopyToAsyncTests.cs @@ -0,0 +1,215 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class PipeWriterCopyToAsyncTests + { + [Fact] + public async Task CopyToAsyncThrowsArgumentNullExceptionForNullSource() + { + var pipe = new Pipe(); + MemoryStream stream = null; + var ex = await Assert.ThrowsAsync(() => stream.CopyToAsync(pipe.Writer)); + Assert.Equal("source", ex.ParamName); + } + + [Fact] + public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination() + { + var stream = new MemoryStream(); + var ex = await Assert.ThrowsAsync(() => stream.CopyToAsync(null)); + Assert.Equal("destination", ex.ParamName); + } + + [Fact] + public async Task CopyToAsyncThrowsTaskCanceledExceptionForAlreadyCancelledToken() + { + var pipe = new Pipe(); + await Assert.ThrowsAsync(() => new MemoryStream().CopyToAsync(pipe.Writer, new CancellationToken(true))); + } + + [Fact] + public async Task CopyToAsyncWorks() + { + var helloBytes = Encoding.UTF8.GetBytes("Hello World"); + + var pipe = new Pipe(); + var stream = new MemoryStream(helloBytes); + await stream.CopyToAsync(pipe.Writer); + + ReadResult result = await pipe.Reader.ReadAsync(); + + Assert.Equal(helloBytes, result.Buffer.ToArray()); + + pipe.Reader.AdvanceTo(result.Buffer.End); + pipe.Reader.Complete(); + pipe.Writer.Complete(); + } + + [Fact] + public async Task CopyToAsyncCalledMultipleTimesWorks() + { + var hello = "Hello World"; + var helloBytes = Encoding.UTF8.GetBytes(hello); + var expected = Encoding.UTF8.GetBytes(hello + hello + hello); + + var pipe = new Pipe(); + await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer); + await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer); + await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer); + pipe.Writer.Complete(); + + ReadResult result = await pipe.Reader.ReadAsync(); + + Assert.Equal(expected, result.Buffer.ToArray()); + + pipe.Reader.AdvanceTo(result.Buffer.End); + pipe.Reader.Complete(); + } + + [Fact] + public async Task StreamCopyToAsyncWorks() + { + var helloBytes = Encoding.UTF8.GetBytes("Hello World"); + + var pipe = new Pipe(); + var stream = new MemoryStream(helloBytes); + await stream.CopyToAsync(pipe.Writer); + + ReadResult result = await pipe.Reader.ReadAsync(); + + Assert.Equal(helloBytes, result.Buffer.ToArray()); + + pipe.Reader.AdvanceTo(result.Buffer.End); + pipe.Reader.Complete(); + } + + [Fact] + public async Task CancelingViaCancelPendingFlushThrows() + { + var helloBytes = Encoding.UTF8.GetBytes("Hello World"); + + var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: helloBytes.Length - 1, resumeWriterThreshold: 0)); + var stream = new MemoryStream(helloBytes); + Task task = stream.CopyToAsync(pipe.Writer); + + Assert.False(task.IsCompleted); + + pipe.Writer.CancelPendingFlush(); + + await Assert.ThrowsAsync(() => task); + + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + [Fact] + public async Task CancelingViaCancellationTokenThrows() + { + var helloBytes = Encoding.UTF8.GetBytes("Hello World"); + + var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: helloBytes.Length - 1, resumeWriterThreshold: 0)); + var stream = new MemoryStream(helloBytes); + var cts = new CancellationTokenSource(); + Task task = stream.CopyToAsync(pipe.Writer, cts.Token); + + Assert.False(task.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAsync(() => task); + + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + [Fact] + public async Task CancelingStreamViaCancellationTokenThrows() + { + var pipe = new Pipe(); + var stream = new CancelledReadsStream(); + var cts = new CancellationTokenSource(); + Task task = stream.CopyToAsync(pipe.Writer, cts.Token); + + Assert.False(task.IsCompleted); + + cts.Cancel(); + + stream.WaitForReadTask.TrySetResult(null); + + await Assert.ThrowsAsync(() => task); + + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + private class CancelledReadsStream : ReadOnlyStream + { + public TaskCompletionSource WaitForReadTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await WaitForReadTask.Task; + + cancellationToken.ThrowIfCancellationRequested(); + + return 0; + } + +#if !netstandard + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + await WaitForReadTask.Task; + + cancellationToken.ThrowIfCancellationRequested(); + + return 0; + } +#endif + } + + private abstract class ReadOnlyStream : Stream + { + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + public override void Flush() + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj index ac63118..2f6f1e5 100644 --- a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj +++ b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj @@ -11,6 +11,7 @@ + @@ -21,6 +22,7 @@ +