{
public partial struct FlushResult
{
- private int _dummy;
+ private int _dummyPrimitive;
public FlushResult(bool isCanceled, bool isCompleted) { throw null; }
public bool IsCanceled { get { throw null; } }
public bool IsCompleted { get { throw null; } }
public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
public abstract void CancelPendingRead();
public abstract void Complete(System.Exception exception = null);
+ public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public abstract void OnWriterCompleted(System.Action<System.Exception, object> callback, object state);
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public abstract bool TryRead(out System.IO.Pipelines.ReadResult result);
public abstract void Advance(int bytes);
public abstract void CancelPendingFlush();
public abstract void Complete(System.Exception exception = null);
+ protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public abstract System.Memory<byte> GetMemory(int sizeHint = 0);
public abstract System.Span<byte> GetSpan(int sizeHint = 0);
public readonly partial struct ReadResult
{
private readonly object _dummy;
+ private readonly int _dummyPrimitive;
public ReadResult(System.Buffers.ReadOnlySequence<byte> buffer, bool isCanceled, bool isCompleted) { throw null; }
public System.Buffers.ReadOnlySequence<byte> Buffer { get { throw null; } }
public bool IsCanceled { get { throw null; } }
public bool IsCompleted { get { throw null; } }
}
+ public static partial class StreamPipeExtensions
+ {
+ public static System.Threading.Tasks.Task CopyToAsync(this System.IO.Stream source, System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+ }
}
<Compile Include="System\IO\Pipelines\PipeWriter.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
+ <Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
<Compile Include="System\IO\Pipelines\ThrowHelper.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'=='netcoreapp'">
<Compile Include="System\IO\Pipelines\ThreadPoolScheduler.netcoreapp.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'=='netstandard'">
+ <Compile Include="System\IO\Pipelines\StreamExtensions.netstandard.cs" />
<Compile Include="System\IO\Pipelines\ThreadPoolScheduler.netstandard.cs" />
<Compile Include="System\IO\Pipelines\CancellationTokenExtensions.netstandard.cs" />
</ItemGroup>
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Threading" />
<Reference Include="System.Threading.Tasks.Extensions" />
+ <Reference Include="System.Threading.Tasks" />
<Reference Include="System.Threading.ThreadPool" />
</ItemGroup>
</Project>
\ No newline at end of file
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Defines a class that provides access to a read side of pipe.
/// </summary>
- public abstract class PipeReader
+ public abstract partial class PipeReader
{
/// <summary>
/// Attempt to synchronously read data the <see cref="PipeReader"/>.
/// Cancel the pending <see cref="ReadAsync"/> operation. If there is none, cancels next <see cref="ReadAsync"/> operation, without completing the <see cref="PipeWriter"/>.
/// </summary>
public abstract void OnWriterCompleted(Action<Exception, object> callback, object state);
+
+ /// <summary>
+ /// Asynchronously reads the bytes from the <see cref="PipeReader"/> and writes them to the specified stream, using a specified buffer size and cancellation token.
+ /// </summary>
+ /// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
+ /// <returns>A task that represents the asynchronous copy operation.</returns>
+ public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
+ {
+ if (destination == null)
+ {
+ throw new ArgumentNullException(nameof(destination));
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ return CopyToAsyncCore(destination, cancellationToken);
+ }
+
+ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancellationToken)
+ {
+ while (true)
+ {
+ SequencePosition consumed = default;
+
+ try
+ {
+ ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false);
+ ReadOnlySequence<byte> buffer = result.Buffer;
+ SequencePosition position = buffer.Start;
+
+ if (result.IsCanceled)
+ {
+ throw new OperationCanceledException();
+ }
+
+ while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
+ {
+ await destination.WriteAsync(memory, cancellationToken).ConfigureAwait(false);
+
+ consumed = position;
+ }
+
+ if (result.IsCompleted)
+ {
+ break;
+ }
+ }
+ finally
+ {
+ // Advance even if WriteAsync throws so the PipeReader is not left in the
+ // currently reading state
+ AdvanceTo(consumed);
+ }
+ }
+ }
}
}
/// <summary>
/// Defines a class that provides a pipeline to which data can be written.
/// </summary>
- public abstract class PipeWriter : IBufferWriter<byte>
+ public abstract partial class PipeWriter : IBufferWriter<byte>
{
/// <summary>
/// Marks the <see cref="PipeWriter"/> as being complete, meaning no more items will be written to it.
this.Write(source.Span);
return FlushAsync(cancellationToken);
}
+
+ /// <summary>
+ /// Asynchronously reads the bytes from the specified stream and writes them to the <see cref="PipeWriter"/>.
+ /// </summary>
+ /// <param name="source">The stream from which the contents will be copied.</param>
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
+ /// <returns>A task that represents the asynchronous copy operation.</returns>
+ protected internal virtual async Task CopyFromAsync(Stream source, CancellationToken cancellationToken = default)
+ {
+ while (true)
+ {
+ Memory<byte> buffer = GetMemory();
+ int read = await source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
+
+ if (read == 0)
+ {
+ break;
+ }
+
+ Advance(read);
+
+ FlushResult result = await FlushAsync(cancellationToken).ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ throw new OperationCanceledException();
+ }
+
+ if (result.IsCompleted)
+ {
+ break;
+ }
+ }
+ }
}
}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+ // Helpers to write Memory<byte> to Stream on netstandard 2.0
+ internal static class StreamExtensions
+ {
+ public static ValueTask<int> ReadAsync(this Stream stream, Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
+ {
+ return new ValueTask<int>(stream.ReadAsync(array.Array, array.Offset, array.Count, cancellationToken));
+ }
+ else
+ {
+ byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
+ return FinishReadAsync(stream.ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer);
+
+ async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
+ {
+ try
+ {
+ int result = await readTask.ConfigureAwait(false);
+ new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
+ return result;
+ }
+ finally
+ {
+ ArrayPool<byte>.Shared.Return(localBuffer);
+ }
+ }
+ }
+ }
+
+ public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
+ {
+ return new ValueTask(stream.WriteAsync(array.Array, array.Offset, array.Count, cancellationToken));
+ }
+ else
+ {
+ byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
+ buffer.Span.CopyTo(sharedBuffer);
+ return new ValueTask(FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer));
+ }
+ }
+
+ private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
+ {
+ try
+ {
+ await writeTask.ConfigureAwait(false);
+ }
+ finally
+ {
+ ArrayPool<byte>.Shared.Return(localBuffer);
+ }
+ }
+ }
+}
--- /dev/null
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.IO.Pipelines
+{
+ public static class StreamPipeExtensions
+ {
+ /// <summary>
+ /// Asynchronously reads the bytes from the <see cref="Stream"/> and writes them to the specified <see cref="PipeWriter"/>, using a cancellation token.
+ /// </summary>
+ /// <param name="source">The stream from which the contents of the current stream will be copied.</param>
+ /// <param name="destination">The <see cref="PipeWriter"/> to which the contents of the source stream will be copied.</param>
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
+ /// <returns>A task that represents the asynchronous copy operation.</returns>
+ public static Task CopyToAsync(this Stream source, PipeWriter destination, CancellationToken cancellationToken = default)
+ {
+ if (source == null)
+ {
+ throw new ArgumentNullException(nameof(source));
+ }
+
+ if (destination == null)
+ {
+ throw new ArgumentNullException(nameof(destination));
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return Task.FromCanceled(cancellationToken);
+ }
+
+ return destination.CopyFromAsync(source, cancellationToken);
+ }
+ }
+}
--- /dev/null
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class CopyToAsyncTests
+ {
+ private static readonly PipeOptions s_testOptions = new PipeOptions(readerScheduler: PipeScheduler.Inline);
+
+ [Fact]
+ public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination()
+ {
+ var pipe = new Pipe(s_testOptions);
+ var ex = await Assert.ThrowsAsync<ArgumentNullException>(() => pipe.Reader.CopyToAsync(null));
+ Assert.Equal("destination", ex.ParamName);
+ }
+
+ [Fact]
+ public async Task CopyToAsyncThrowsTaskCanceledExceptionForAlreadyCancelledToken()
+ {
+ var pipe = new Pipe(s_testOptions);
+ await Assert.ThrowsAsync<TaskCanceledException>(() => pipe.Reader.CopyToAsync(new MemoryStream(), new CancellationToken(true)));
+ }
+
+ [Fact]
+ public async Task CopyToAsyncWorks()
+ {
+ var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+ var pipe = new Pipe(s_testOptions);
+ await pipe.Writer.WriteAsync(helloBytes);
+ pipe.Writer.Complete();
+
+ var stream = new MemoryStream();
+ await pipe.Reader.CopyToAsync(stream);
+ pipe.Reader.Complete();
+
+ Assert.Equal(helloBytes, stream.ToArray());
+ }
+
+ [Fact]
+ public async Task MultiSegmentWritesWorks()
+ {
+ using (var pool = new TestMemoryPool())
+ {
+ var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline));
+ pipe.Writer.WriteEmpty(4096);
+ pipe.Writer.WriteEmpty(4096);
+ pipe.Writer.WriteEmpty(4096);
+ await pipe.Writer.FlushAsync();
+ pipe.Writer.Complete();
+
+ var stream = new MemoryStream();
+ await pipe.Reader.CopyToAsync(stream);
+ pipe.Reader.Complete();
+
+ Assert.Equal(4096 * 3, stream.Length);
+ }
+ }
+
+ [Fact]
+ public async Task MultiSegmentWritesUntilFailure()
+ {
+ using (var pool = new TestMemoryPool())
+ {
+ var pipe = new Pipe(new PipeOptions(pool: pool, readerScheduler: PipeScheduler.Inline));
+ pipe.Writer.WriteEmpty(4096);
+ pipe.Writer.WriteEmpty(4096);
+ pipe.Writer.WriteEmpty(4096);
+ await pipe.Writer.FlushAsync();
+ pipe.Writer.Complete();
+
+ var stream = new ThrowAfterNWritesStream(2);
+ await Assert.ThrowsAsync<InvalidOperationException>(() => pipe.Reader.CopyToAsync(stream));
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.Equal(4096, result.Buffer.Length);
+ pipe.Reader.Complete();
+ }
+ }
+
+ [Fact]
+ public async Task EmptyBufferNotWrittenToStream()
+ {
+ var pipe = new Pipe(s_testOptions);
+ pipe.Writer.Complete();
+
+ var stream = new ThrowingStream();
+ await pipe.Reader.CopyToAsync(stream);
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CancelingThePendingReadThrowsOperationCancelledException()
+ {
+ var pipe = new Pipe(s_testOptions);
+ var stream = new MemoryStream();
+ Task task = pipe.Reader.CopyToAsync(stream);
+
+ pipe.Reader.CancelPendingRead();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+ }
+
+ [Fact]
+ public async Task CancelingViaCancellationTokenThrowsOperationCancelledException()
+ {
+ var pipe = new Pipe(s_testOptions);
+ var stream = new MemoryStream();
+ var cts = new CancellationTokenSource();
+ Task task = pipe.Reader.CopyToAsync(stream, cts.Token);
+
+ cts.Cancel();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+ }
+
+ [Fact]
+ public async Task CancelingStreamViaCancellationTokenThrowsOperationCancelledException()
+ {
+ var pipe = new Pipe(s_testOptions);
+ var stream = new CancelledWritesStream();
+ var cts = new CancellationTokenSource();
+ Task task = pipe.Reader.CopyToAsync(stream, cts.Token);
+
+ // Call write async inline, this will yield when it hits the tcs
+ pipe.Writer.WriteEmpty(10);
+ await pipe.Writer.FlushAsync();
+
+ // Then cancel
+ cts.Cancel();
+
+ // Now resume the write which should result in an exception
+ stream.WaitForWriteTask.TrySetResult(null);
+
+ await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+ }
+
+ [Fact]
+ public async Task ThrowingFromStreamDoesNotLeavePipeReaderInBrokenState()
+ {
+ var pipe = new Pipe(s_testOptions);
+ var stream = new ThrowingStream();
+ Task task = pipe.Reader.CopyToAsync(stream);
+
+ pipe.Writer.WriteEmpty(10);
+ await pipe.Writer.FlushAsync();
+
+ await Assert.ThrowsAsync<InvalidOperationException>(() => task);
+
+ pipe.Writer.WriteEmpty(10);
+ await pipe.Writer.FlushAsync();
+ pipe.Writer.Complete();
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+ Assert.True(result.IsCompleted);
+ Assert.Equal(20, result.Buffer.Length);
+ pipe.Reader.Complete();
+ }
+
+ private class CancelledWritesStream : WriteOnlyStream
+ {
+ public TaskCompletionSource<object> WaitForWriteTask = new TaskCompletionSource<object>(TaskContinuationOptions.RunContinuationsAsynchronously);
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ await WaitForWriteTask.Task;
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+
+#if !netstandard
+ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ await WaitForWriteTask.Task;
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+#endif
+ }
+ private class ThrowingStream : ThrowAfterNWritesStream
+ {
+ public ThrowingStream() : base(0)
+ {
+ }
+ }
+
+ private class ThrowAfterNWritesStream : WriteOnlyStream
+ {
+ private readonly int _maxWrites;
+ private int _writes;
+
+ public ThrowAfterNWritesStream(int maxWrites)
+ {
+ _maxWrites = maxWrites;
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new InvalidOperationException();
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (_writes >= _maxWrites)
+ {
+ throw new InvalidOperationException();
+ }
+ _writes++;
+ return Task.CompletedTask;
+ }
+
+#if !netstandard
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ if (_writes >= _maxWrites)
+ {
+ throw new InvalidOperationException();
+ }
+ _writes++;
+ return default;
+ }
+#endif
+ }
+
+ private abstract class WriteOnlyStream : Stream
+ {
+ public override bool CanRead => false;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => true;
+
+ public override long Length => throw new NotSupportedException();
+
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+ public override void Flush()
+ {
+ throw new InvalidOperationException();
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+ }
+ }
+}
--- /dev/null
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.IO.Pipelines.Tests
+{
+ public class PipeWriterCopyToAsyncTests
+ {
+ [Fact]
+ public async Task CopyToAsyncThrowsArgumentNullExceptionForNullSource()
+ {
+ var pipe = new Pipe();
+ MemoryStream stream = null;
+ var ex = await Assert.ThrowsAsync<ArgumentNullException>(() => stream.CopyToAsync(pipe.Writer));
+ Assert.Equal("source", ex.ParamName);
+ }
+
+ [Fact]
+ public async Task CopyToAsyncThrowsArgumentNullExceptionForNullDestination()
+ {
+ var stream = new MemoryStream();
+ var ex = await Assert.ThrowsAsync<ArgumentNullException>(() => stream.CopyToAsync(null));
+ Assert.Equal("destination", ex.ParamName);
+ }
+
+ [Fact]
+ public async Task CopyToAsyncThrowsTaskCanceledExceptionForAlreadyCancelledToken()
+ {
+ var pipe = new Pipe();
+ await Assert.ThrowsAsync<TaskCanceledException>(() => new MemoryStream().CopyToAsync(pipe.Writer, new CancellationToken(true)));
+ }
+
+ [Fact]
+ public async Task CopyToAsyncWorks()
+ {
+ var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+ var pipe = new Pipe();
+ var stream = new MemoryStream(helloBytes);
+ await stream.CopyToAsync(pipe.Writer);
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+
+ Assert.Equal(helloBytes, result.Buffer.ToArray());
+
+ pipe.Reader.AdvanceTo(result.Buffer.End);
+ pipe.Reader.Complete();
+ pipe.Writer.Complete();
+ }
+
+ [Fact]
+ public async Task CopyToAsyncCalledMultipleTimesWorks()
+ {
+ var hello = "Hello World";
+ var helloBytes = Encoding.UTF8.GetBytes(hello);
+ var expected = Encoding.UTF8.GetBytes(hello + hello + hello);
+
+ var pipe = new Pipe();
+ await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer);
+ await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer);
+ await new MemoryStream(helloBytes).CopyToAsync(pipe.Writer);
+ pipe.Writer.Complete();
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+
+ Assert.Equal(expected, result.Buffer.ToArray());
+
+ pipe.Reader.AdvanceTo(result.Buffer.End);
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task StreamCopyToAsyncWorks()
+ {
+ var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+ var pipe = new Pipe();
+ var stream = new MemoryStream(helloBytes);
+ await stream.CopyToAsync(pipe.Writer);
+
+ ReadResult result = await pipe.Reader.ReadAsync();
+
+ Assert.Equal(helloBytes, result.Buffer.ToArray());
+
+ pipe.Reader.AdvanceTo(result.Buffer.End);
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CancelingViaCancelPendingFlushThrows()
+ {
+ var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+ var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: helloBytes.Length - 1, resumeWriterThreshold: 0));
+ var stream = new MemoryStream(helloBytes);
+ Task task = stream.CopyToAsync(pipe.Writer);
+
+ Assert.False(task.IsCompleted);
+
+ pipe.Writer.CancelPendingFlush();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CancelingViaCancellationTokenThrows()
+ {
+ var helloBytes = Encoding.UTF8.GetBytes("Hello World");
+
+ var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: helloBytes.Length - 1, resumeWriterThreshold: 0));
+ var stream = new MemoryStream(helloBytes);
+ var cts = new CancellationTokenSource();
+ Task task = stream.CopyToAsync(pipe.Writer, cts.Token);
+
+ Assert.False(task.IsCompleted);
+
+ cts.Cancel();
+
+ await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ [Fact]
+ public async Task CancelingStreamViaCancellationTokenThrows()
+ {
+ var pipe = new Pipe();
+ var stream = new CancelledReadsStream();
+ var cts = new CancellationTokenSource();
+ Task task = stream.CopyToAsync(pipe.Writer, cts.Token);
+
+ Assert.False(task.IsCompleted);
+
+ cts.Cancel();
+
+ stream.WaitForReadTask.TrySetResult(null);
+
+ await Assert.ThrowsAsync<OperationCanceledException>(() => task);
+
+ pipe.Writer.Complete();
+ pipe.Reader.Complete();
+ }
+
+ private class CancelledReadsStream : ReadOnlyStream
+ {
+ public TaskCompletionSource<object> WaitForReadTask = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ await WaitForReadTask.Task;
+
+ cancellationToken.ThrowIfCancellationRequested();
+
+ return 0;
+ }
+
+#if !netstandard
+ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ await WaitForReadTask.Task;
+
+ cancellationToken.ThrowIfCancellationRequested();
+
+ return 0;
+ }
+#endif
+ }
+
+ private abstract class ReadOnlyStream : Stream
+ {
+ public override bool CanRead => true;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => false;
+
+ public override long Length => throw new NotSupportedException();
+
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+ public override void Flush()
+ {
+ throw new NotSupportedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ throw new NotSupportedException();
+ }
+ }
+ }
+}
</ItemGroup>
<ItemGroup>
<Compile Include="BackpressureTests.cs" />
+ <Compile Include="PipeReaderCopyToAsyncTests.cs" />
<Compile Include="FlushAsyncCancellationTests.cs" />
<Compile Include="FlushAsyncCompletionTests.cs" />
<Compile Include="FlushAsyncTests.cs" />
<Compile Include="PipePoolTests.cs" />
<Compile Include="PipeResetTests.cs" />
<Compile Include="PipeTest.cs" />
+ <Compile Include="PipeWriterCopyToAsyncTests.cs" />
<Compile Include="PipeWriterTests.cs" />
<Compile Include="ReadAsyncCancellationTests.cs" />
<Compile Include="ReadAsyncCompletionTests.cs" />