public delegate Task<int> ReadAsyncDelegate(Stream stream, byte[] data);
[Theory]
+ [InlineData(false)]
+ [InlineData(true)]
+ public async Task DisposingPipeReaderStreamCompletesPipeReader(bool dataInPipe)
+ {
+ var pipe = new Pipe();
+ Stream s = pipe.Reader.AsStream();
+
+ if (dataInPipe)
+ {
+ await pipe.Writer.WriteAsync(new byte[42]);
+ await pipe.Writer.FlushAsync();
+ }
+
+ var readerCompletedTask = new TaskCompletionSource<bool>();
+ pipe.Writer.OnReaderCompleted(delegate { readerCompletedTask.SetResult(true); }, null);
+
+ // Call Dispose{Async} multiple times; all should succeed.
+ for (int i = 0; i < 2; i++)
+ {
+ s.Dispose();
+ await s.DisposeAsync();
+ }
+
+ // Make sure OnReaderCompleted was invoked.
+ await readerCompletedTask.Task;
+
+ // Unable to read after disposing.
+ await Assert.ThrowsAsync<InvalidOperationException>(async () => await s.ReadAsync(new byte[1]));
+
+ // Writes still work.
+ await pipe.Writer.WriteAsync(new byte[1]);
+ }
+
+ [Theory]
[MemberData(nameof(ReadCalls))]
public async Task ReadingFromPipeReaderStreamReadsFromUnderlyingPipeReader(ReadAsyncDelegate readAsync)
{
Assert.Same(stream, pipeReader.AsStream());
}
+ [Fact]
+ public async Task PipeWriterStreamProducesToConsumingPipeReaderStream()
+ {
+ var pipe = new Pipe();
+
+ int consumedSum = 0, producedSum = 0;
+ Task consumer = Task.Run(() =>
+ {
+ using (Stream reader = pipe.Reader.AsStream())
+ {
+ int b;
+ while ((b = reader.ReadByte()) != -1)
+ {
+ consumedSum += b;
+ }
+
+ Assert.Equal(-1, reader.ReadByte());
+ }
+ });
+
+ var rand = new Random();
+ using (Stream writer = pipe.Writer.AsStream())
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ byte b = (byte)rand.Next(256);
+ writer.WriteByte(b);
+ producedSum += b;
+ }
+ }
+
+ await consumer;
+ Assert.Equal(producedSum, consumedSum);
+ }
+
public class BuggyPipeReader : PipeReader
{
public override void AdvanceTo(SequencePosition consumed)
{
public delegate Task WriteAsyncDelegate(Stream stream, byte[] data);
+ [Fact]
+ public async Task DisposingPipeWriterStreamCompletesPipeWriter()
+ {
+ var pipe = new Pipe();
+ Stream s = pipe.Writer.AsStream();
+
+ var writerCompletedTask = new TaskCompletionSource<bool>();
+ pipe.Reader.OnWriterCompleted(delegate { writerCompletedTask.SetResult(true); }, null);
+
+ // Call Dispose{Async} multiple times; all should succeed.
+ for (int i = 0; i < 2; i++)
+ {
+ s.Dispose();
+ await s.DisposeAsync();
+ }
+
+ // Make sure OnWriterCompleted was invoked.
+ await writerCompletedTask.Task;
+
+ // Unable to write after disposing.
+ await Assert.ThrowsAsync<InvalidOperationException>(async () => await s.WriteAsync(new byte[1]));
+
+ // Reads still work and return 0.
+ ReadResult rr = await pipe.Reader.ReadAsync();
+ Assert.True(rr.IsCompleted);
+ Assert.Equal(0, rr.Buffer.Length);
+ }
+
[Theory]
[MemberData(nameof(WriteCalls))]
public async Task WritingToPipeStreamWritesToUnderlyingPipeWriter(WriteAsyncDelegate writeAsync)