From 975489f69dd3aa854963d4dc650844abb90171ab Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 19 Feb 2019 14:24:50 -0800 Subject: [PATCH] Added AsStream to PipeReader and PipeWriter (dotnet/corefx#35399) - This adds a new virtual member to PipeReader and PipeWriter to get a read only or write only stream from the PipeReader and PipeWriter - This introduces a new field on the base types - Added tests Commit migrated from https://github.com/dotnet/corefx/commit/968a6a4450ca5acd3aed6a6f6e1f8b445348e729 --- .../System.IO.Pipelines/ref/System.IO.Pipelines.cs | 2 + .../System.IO.Pipelines/src/Resources/Strings.resx | 65 ++-- .../src/System.IO.Pipelines.csproj | 5 + .../src/System/IO/Pipelines/PipeReader.cs | 13 +- .../src/System/IO/Pipelines/PipeReaderStream.cs | 114 +++++++ .../src/System/IO/Pipelines/PipeWriter.cs | 13 +- .../src/System/IO/Pipelines/PipeWriterStream.cs | 103 +++++++ .../src/System/IO/Pipelines/ThrowHelper.cs | 12 + .../tests/PipeReaderStreamTests.nonnetstandard.cs | 328 +++++++++++++++++++++ .../tests/PipeWriterStreamTests.nonnetstandard.cs | 219 ++++++++++++++ .../tests/System.IO.Pipelines.Tests.csproj | 2 + 11 files changed, 846 insertions(+), 30 deletions(-) create mode 100644 src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs create mode 100644 src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs create mode 100644 src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs create mode 100644 src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs index 7382572..e786214 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs @@ -44,6 +44,7 @@ namespace System.IO.Pipelines protected PipeReader() { } public abstract void AdvanceTo(System.SequencePosition consumed); public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined); + public virtual System.IO.Stream AsStream() { throw null; } public abstract void CancelPendingRead(); public abstract void Complete(System.Exception exception = null); public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } @@ -62,6 +63,7 @@ namespace System.IO.Pipelines { protected PipeWriter() { } public abstract void Advance(int bytes); + public virtual System.IO.Stream AsStream() { throw null; } public abstract void CancelPendingFlush(); public abstract void Complete(System.Exception exception = null); protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } diff --git a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx index 7af100a..6657d16 100644 --- a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx +++ b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx @@ -1,17 +1,17 @@  - @@ -135,15 +135,24 @@ Concurrent reads or writes are not supported. + + Flush was canceled on underlying PipeWriter. + Can't GetResult unless awaiter is completed. + + The PipeReader returned 0 bytes when the ReadResult was not completed or canceled. + No reading operation to complete. No writing operation. Make sure GetMemory() was called. + + Read was canceled on underlying PipeReader. + Both reader and writer has to be completed to be able to reset the pipe. @@ -156,4 +165,4 @@ Writing is not allowed after writer was completed. - + \ No newline at end of file diff --git a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj index 2758ac3..4f81ddc 100644 --- a/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj @@ -4,6 +4,9 @@ netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release + + Common\CoreLib\System\Threading\Tasks\TaskToApm.cs + @@ -20,8 +23,10 @@ + + diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs index 85e4899..f5b0db7 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs @@ -13,6 +13,8 @@ namespace System.IO.Pipelines /// public abstract partial class PipeReader { + private PipeReaderStream _stream; + /// /// Attempt to synchronously read data the . /// @@ -49,6 +51,15 @@ namespace System.IO.Pipelines public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined); /// + /// + /// + /// + public virtual Stream AsStream() + { + return _stream ?? (_stream = new PipeReaderStream(this)); + } + + /// /// Cancel to currently pending or if none is pending next call to , without completing the . /// public abstract void CancelPendingRead(); @@ -99,7 +110,7 @@ namespace System.IO.Pipelines if (result.IsCanceled) { - throw new OperationCanceledException(); + ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } while (buffer.TryGet(ref position, out ReadOnlyMemory memory)) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs new file mode 100644 index 0000000..6b449db --- /dev/null +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderStream.cs @@ -0,0 +1,114 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Buffers; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + internal sealed class PipeReaderStream : Stream + { + private readonly PipeReader _pipeReader; + + public PipeReaderStream(PipeReader pipeReader) + { + Debug.Assert(pipeReader != null); + _pipeReader = pipeReader; + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + + public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) => + TaskToApm.Begin(ReadAsync(buffer, offset, count, default), callback, state); + + public sealed override int EndRead(IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + +#if !netstandard + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(buffer, cancellationToken); + } +#endif + + private async ValueTask ReadAsyncInternal(Memory buffer, CancellationToken cancellationToken) + { + ReadResult result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); + + if (result.IsCanceled) + { + ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); + } + + ReadOnlySequence sequence = result.Buffer; + long bufferLength = sequence.Length; + SequencePosition consumed = sequence.Start; + + try + { + if (bufferLength != 0) + { + int actual = (int)Math.Min(bufferLength, buffer.Length); + + ReadOnlySequence slice = actual == bufferLength ? sequence : sequence.Slice(0, actual); + consumed = slice.End; + slice.CopyTo(buffer.Span); + + return actual; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + _pipeReader.AdvanceTo(consumed); + } + + // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader + // isn't completed or canceled + ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead(); + return 0; + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + // Delegate to CopyToAsync on the PipeReader + return _pipeReader.CopyToAsync(destination, cancellationToken); + } + } +} diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs index 821fa9e..036ac1b 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs @@ -13,6 +13,8 @@ namespace System.IO.Pipelines /// public abstract partial class PipeWriter : IBufferWriter { + private PipeWriterStream _stream; + /// /// Marks the as being complete, meaning no more items will be written to it. /// @@ -44,6 +46,15 @@ namespace System.IO.Pipelines public abstract Span GetSpan(int sizeHint = 0); /// + /// + /// + /// + public virtual Stream AsStream() + { + return _stream ?? (_stream = new PipeWriterStream(this)); + } + + /// /// Writes to the pipe and makes data accessible to /// public virtual ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) @@ -76,7 +87,7 @@ namespace System.IO.Pipelines if (result.IsCanceled) { - throw new OperationCanceledException(); + ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } if (result.IsCompleted) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs new file mode 100644 index 0000000..cdab755 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriterStream.cs @@ -0,0 +1,103 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + internal sealed class PipeWriterStream : Stream + { + private readonly PipeWriter _pipeWriter; + + public PipeWriterStream(PipeWriter pipeWriter) + { + Debug.Assert(pipeWriter != null); + _pipeWriter = pipeWriter; + } + + public override bool CanRead => false; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotSupportedException(); + + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + public override void Flush() + { + FlushAsync().GetAwaiter().GetResult(); + } + + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + + public override void SetLength(long value) => throw new NotSupportedException(); + + public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) => + TaskToApm.Begin(WriteAsync(buffer, offset, count, default), callback, state); + + public sealed override void EndWrite(IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + ValueTask valueTask = _pipeWriter.WriteAsync(new ReadOnlyMemory(buffer, offset, count), cancellationToken); + + return GetFlushResultAsTask(valueTask); + } + +#if !netstandard + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + ValueTask valueTask = _pipeWriter.WriteAsync(buffer, cancellationToken); + + return new ValueTask(GetFlushResultAsTask(valueTask)); + } +#endif + + public override Task FlushAsync(CancellationToken cancellationToken) + { + ValueTask valueTask = _pipeWriter.FlushAsync(cancellationToken); + + return GetFlushResultAsTask(valueTask); + } + + private static Task GetFlushResultAsTask(ValueTask valueTask) + { + if (valueTask.IsCompletedSuccessfully) + { + FlushResult result = valueTask.Result; + if (result.IsCanceled) + { + ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); + } + + return Task.CompletedTask; + } + + static async Task AwaitTask(ValueTask valueTask) + { + FlushResult result = await valueTask.ConfigureAwait(false); + + if (result.IsCanceled) + { + ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); + } + } + + return AwaitTask(valueTask); + } + } +} + diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs index 5b5b51d..c9bcff4 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs @@ -60,6 +60,18 @@ namespace System.IO.Pipelines public static void ThrowInvalidOperationException_ResetIncompleteReaderWriter() => throw CreateInvalidOperationException_ResetIncompleteReaderWriter(); [MethodImpl(MethodImplOptions.NoInlining)] public static Exception CreateInvalidOperationException_ResetIncompleteReaderWriter() => new InvalidOperationException(SR.ReaderAndWriterHasToBeCompleted); + + public static void ThrowOperationCanceledException_ReadCanceled() => throw CreateOperationCanceledException_ReadCanceled(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateOperationCanceledException_ReadCanceled() => new OperationCanceledException(SR.ReadCanceledOnPipeReader); + + public static void ThrowOperationCanceledException_FlushCanceled() => throw CreateOperationCanceledException_FlushCanceled(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateOperationCanceledException_FlushCanceled() => new OperationCanceledException(SR.FlushCanceledOnPipeWriter); + + public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead); } internal enum ExceptionArgument diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs new file mode 100644 index 0000000..85491b2 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderStreamTests.nonnetstandard.cs @@ -0,0 +1,328 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class PipeReaderStreamTests + { + public delegate Task ReadAsyncDelegate(Stream stream, byte[] data); + + [Theory] + [MemberData(nameof(ReadCalls))] + public async Task ReadingFromPipeReaderStreamReadsFromUnderlyingPipeReader(ReadAsyncDelegate readAsync) + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + await pipe.Writer.WriteAsync(helloBytes); + pipe.Writer.Complete(); + + var stream = new PipeReaderStream(pipe.Reader); + + var buffer = new byte[1024]; + int read = await readAsync(stream, buffer); + + Assert.Equal(helloBytes, buffer.AsSpan(0, read).ToArray()); + pipe.Reader.Complete(); + } + + [Theory] + [MemberData(nameof(ReadCalls))] + public async Task AsStreamReturnsPipeReaderStream(ReadAsyncDelegate readAsync) + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + await pipe.Writer.WriteAsync(helloBytes); + pipe.Writer.Complete(); + + Stream stream = pipe.Reader.AsStream(); + + var buffer = new byte[1024]; + int read = await readAsync(stream, buffer); + + Assert.Equal(helloBytes, buffer.AsSpan(0, read).ToArray()); + pipe.Reader.Complete(); + } + + [Fact] + public async Task ReadingWithSmallerBufferWorks() + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + await pipe.Writer.WriteAsync(helloBytes); + pipe.Writer.Complete(); + + Stream stream = pipe.Reader.AsStream(); + + var buffer = new byte[5]; + int read = await stream.ReadAsync(buffer); + + Assert.Equal(5, read); + Assert.Equal(helloBytes.AsSpan(0, 5).ToArray(), buffer); + + buffer = new byte[3]; + read = await stream.ReadAsync(buffer); + + Assert.Equal(3, read); + Assert.Equal(helloBytes.AsSpan(5, 3).ToArray(), buffer); + + // Verify that the buffer is partially consumed and we can read the rest from the PipeReader directly + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.Equal(helloBytes.AsSpan(8).ToArray(), result.Buffer.ToArray()); + pipe.Reader.AdvanceTo(result.Buffer.End); + + pipe.Reader.Complete(); + } + + [Fact] + public async Task EndOfPipeReaderReturnsZeroBytesFromReadAsync() + { + var pipe = new Pipe(); + Memory memory = pipe.Writer.GetMemory(); + pipe.Writer.Advance(5); + pipe.Writer.Complete(); + + Stream stream = pipe.Reader.AsStream(); + + var buffer = new byte[5]; + var read = await stream.ReadAsync(buffer); + + Assert.Equal(5, read); + + read = await stream.ReadAsync(buffer); + + // Read again to make sure it always returns 0 + Assert.Equal(0, read); + + pipe.Reader.Complete(); + } + + [Fact] + public async Task BuggyPipeReaderImplementationThrows() + { + var pipeReader = new BuggyPipeReader(); + + Stream stream = pipeReader.AsStream(); + + await Assert.ThrowsAsync(async () => await stream.ReadAsync(new byte[5])); + } + + [Fact] + public async Task WritingToPipeReaderStreamThrowsNotSupported() + { + var pipe = new Pipe(); + + Stream stream = pipe.Reader.AsStream(); + Assert.False(stream.CanWrite); + Assert.False(stream.CanSeek); + Assert.True(stream.CanRead); + Assert.Throws(() => { long length = stream.Length; }); + Assert.Throws(() => { long position = stream.Position; }); + Assert.Throws(() => stream.Seek(0, SeekOrigin.Begin)); + Assert.Throws(() => stream.Write(new byte[10], 0, 10)); + await Assert.ThrowsAsync(() => stream.WriteAsync(new byte[10], 0, 10)); + await Assert.ThrowsAsync(() => stream.WriteAsync(new byte[10]).AsTask()); + + pipe.Reader.Complete(); + pipe.Writer.Complete(); + } + + [Fact] + public async Task CancellingPendingReadThrowsOperationCancelledException() + { + var pipe = new Pipe(); + + Stream stream = pipe.Reader.AsStream(); + ValueTask task = stream.ReadAsync(new byte[1024]); + Assert.False(task.IsCompleted); + + pipe.Reader.CancelPendingRead(); + + await Assert.ThrowsAsync(async () => await task); + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + [Fact] + public async Task CanReadAfterCancellingPendingRead() + { + var pipe = new Pipe(); + + Stream stream = pipe.Reader.AsStream(); + ValueTask task = stream.ReadAsync(new byte[1024]); + Assert.False(task.IsCompleted); + + pipe.Reader.CancelPendingRead(); + + await Assert.ThrowsAsync(async () => await task); + pipe.Writer.Complete(); + + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.True(result.IsCompleted); + + pipe.Reader.Complete(); + } + + [Fact] + public async Task CancellationTokenFlowsToUnderlyingPipeReader() + { + var pipe = new Pipe(); + + Stream stream = pipe.Reader.AsStream(); + var cts = new CancellationTokenSource(); + ValueTask task = stream.ReadAsync(new byte[1024], cts.Token); + Assert.False(task.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAsync(async () => await task); + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + [Fact] + public async Task DefaultPipeReaderImplementationReturnsPipeReaderStream() + { + var pipeReader = new TestPipeReader(); + Stream stream = pipeReader.AsStream(); + + await stream.ReadAsync(new byte[10]); + + Assert.True(pipeReader.ReadCalled); + Assert.True(pipeReader.AdvanceToCalled); + } + + [Fact] + public void AsStreamReturnsSameInstance() + { + var pipeReader = new TestPipeReader(); + Stream stream = pipeReader.AsStream(); + + Assert.Same(stream, pipeReader.AsStream()); + } + + public class BuggyPipeReader : PipeReader + { + public override void AdvanceTo(SequencePosition consumed) + { + + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + + } + + public override void CancelPendingRead() + { + throw new NotImplementedException(); + } + + public override void Complete(Exception exception = null) + { + throw new NotImplementedException(); + } + + public override void OnWriterCompleted(Action callback, object state) + { + throw new NotImplementedException(); + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + // Returns a ReadResult with no buffer and with IsCompleted and IsCancelled false + return default; + } + + public override bool TryRead(out ReadResult result) + { + throw new NotImplementedException(); + } + } + + public class TestPipeReader : PipeReader + { + public bool ReadCalled { get; set; } + public bool AdvanceToCalled { get; set; } + + public override void AdvanceTo(SequencePosition consumed) + { + AdvanceToCalled = true; + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + throw new NotImplementedException(); + } + + public override void CancelPendingRead() + { + throw new NotImplementedException(); + } + + public override void Complete(Exception exception = null) + { + throw new NotImplementedException(); + } + + public override void OnWriterCompleted(Action callback, object state) + { + throw new NotImplementedException(); + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + ReadCalled = true; + return new ValueTask(new ReadResult(default, isCanceled: false, isCompleted: true)); + } + + public override bool TryRead(out ReadResult result) + { + throw new NotImplementedException(); + } + } + + public static IEnumerable ReadCalls + { + get + { + ReadAsyncDelegate readArrayAsync = (stream, data) => + { + return stream.ReadAsync(data, 0, data.Length); + }; + + ReadAsyncDelegate readMemoryAsync = async (stream, data) => + { + return await stream.ReadAsync(data); + }; + + ReadAsyncDelegate readMemoryAsyncWithThreadHop = async (stream, data) => + { + await Task.Yield(); + + return await stream.ReadAsync(data); + }; + + ReadAsyncDelegate readArraySync = (stream, data) => + { + return Task.FromResult(stream.Read(data, 0, data.Length)); + }; + + ReadAsyncDelegate readSpanSync = (stream, data) => + { + return Task.FromResult(stream.Read(data)); + }; + + yield return new object[] { readArrayAsync }; + yield return new object[] { readMemoryAsync }; + yield return new object[] { readMemoryAsyncWithThreadHop }; + yield return new object[] { readArraySync }; + yield return new object[] { readSpanSync }; + } + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs new file mode 100644 index 0000000..9d66340 --- /dev/null +++ b/src/libraries/System.IO.Pipelines/tests/PipeWriterStreamTests.nonnetstandard.cs @@ -0,0 +1,219 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class PipeWriterStreamTests + { + public delegate Task WriteAsyncDelegate(Stream stream, byte[] data); + + [Theory] + [MemberData(nameof(WriteCalls))] + public async Task WritingToPipeStreamWritesToUnderlyingPipeWriter(WriteAsyncDelegate writeAsync) + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + var stream = new PipeWriterStream(pipe.Writer); + + await writeAsync(stream, helloBytes); + + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.Equal(helloBytes, result.Buffer.ToArray()); + pipe.Reader.Complete(); + pipe.Writer.Complete(); + } + + [Theory] + [MemberData(nameof(WriteCalls))] + public async Task AsStreamReturnsPipeWriterStream(WriteAsyncDelegate writeAsync) + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + Stream stream = pipe.Writer.AsStream(); + + await writeAsync(stream, helloBytes); + + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.Equal(helloBytes, result.Buffer.ToArray()); + pipe.Reader.Complete(); + pipe.Writer.Complete(); + } + + [Fact] + public async Task FlushAsyncFlushesBufferedData() + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + + Memory memory = pipe.Writer.GetMemory(); + helloBytes.CopyTo(memory); + pipe.Writer.Advance(helloBytes.Length); + + Stream stream = pipe.Writer.AsStream(); + await stream.FlushAsync(); + + ReadResult result = await pipe.Reader.ReadAsync(); + Assert.Equal(helloBytes, result.Buffer.ToArray()); + pipe.Reader.Complete(); + pipe.Writer.Complete(); + } + + [Fact] + public async Task ReadingFromPipeWriterStreamThrowsNotSupported() + { + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + var pipe = new Pipe(); + + Stream stream = pipe.Writer.AsStream(); + Assert.True(stream.CanWrite); + Assert.False(stream.CanSeek); + Assert.False(stream.CanRead); + Assert.Throws(() => { long length = stream.Length; }); + Assert.Throws(() => { long position = stream.Position; }); + Assert.Throws(() => stream.Seek(0, SeekOrigin.Begin)); + Assert.Throws(() => stream.Read(new byte[10], 0, 10)); + await Assert.ThrowsAsync(() => stream.ReadAsync(new byte[10], 0, 10)); + await Assert.ThrowsAsync(() => stream.ReadAsync(new byte[10]).AsTask()); + await Assert.ThrowsAsync(() => stream.CopyToAsync(Stream.Null)); + + pipe.Reader.Complete(); + pipe.Writer.Complete(); + } + + [Fact] + public async Task CancellingPendingFlushThrowsOperationCancelledException() + { + var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 0)); + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + + Stream stream = pipe.Writer.AsStream(); + ValueTask task = stream.WriteAsync(helloBytes); + Assert.False(task.IsCompleted); + + pipe.Writer.CancelPendingFlush(); + + await Assert.ThrowsAsync(async () => await task); + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + [Fact] + public async Task CancellationTokenFlowsToUnderlyingPipeWriter() + { + var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 0)); + byte[] helloBytes = Encoding.ASCII.GetBytes("Hello World"); + + Stream stream = pipe.Writer.AsStream(); + var cts = new CancellationTokenSource(); + ValueTask task = stream.WriteAsync(helloBytes, cts.Token); + Assert.False(task.IsCompleted); + + cts.Cancel(); + + await Assert.ThrowsAsync(async () => await task); + pipe.Writer.Complete(); + pipe.Reader.Complete(); + } + + [Fact] + public async Task DefaultPipeWriterImplementationReturnsPipeWriterStream() + { + var pipeWriter = new TestPipeWriter(); + Stream stream = pipeWriter.AsStream(); + + await stream.WriteAsync(new byte[10]); + + Assert.True(pipeWriter.WriteAsyncCalled); + + await stream.FlushAsync(); + + Assert.True(pipeWriter.FlushCalled); + } + + public class TestPipeWriter : PipeWriter + { + public bool FlushCalled { get; set; } + public bool WriteAsyncCalled { get; set; } + + public override void Advance(int bytes) + { + throw new NotImplementedException(); + } + + public override void CancelPendingFlush() + { + throw new NotImplementedException(); + } + + public override void Complete(Exception exception = null) + { + throw new NotImplementedException(); + } + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + FlushCalled = true; + return default; + } + + public override Memory GetMemory(int sizeHint = 0) + { + throw new NotImplementedException(); + } + + public override Span GetSpan(int sizeHint = 0) + { + throw new NotImplementedException(); + } + + public override void OnReaderCompleted(Action callback, object state) + { + throw new NotImplementedException(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + WriteAsyncCalled = true; + return default; + } + } + + public static IEnumerable WriteCalls + { + get + { + WriteAsyncDelegate writeArrayAsync = (stream, data) => + { + return stream.WriteAsync(data, 0, data.Length); + }; + + WriteAsyncDelegate writeMemoryAsync = async (stream, data) => + { + await stream.WriteAsync(data); + }; + + WriteAsyncDelegate writeArraySync = (stream, data) => + { + stream.Write(data, 0, data.Length); + return Task.CompletedTask; + }; + + WriteAsyncDelegate writeSpanSync = (stream, data) => + { + stream.Write(data); + return Task.CompletedTask; + }; + + yield return new object[] { writeArrayAsync }; + yield return new object[] { writeMemoryAsync }; + yield return new object[] { writeArraySync }; + yield return new object[] { writeSpanSync }; + } + } + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj index 2f6f1e5..da672ba 100644 --- a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj +++ b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj @@ -35,5 +35,7 @@ + + \ No newline at end of file -- 2.7.4